"""OLAF (OreSat Linux App Framework)"""
import sys
from argparse import ArgumentParser, Namespace
from logging.handlers import SysLogHandler
from typing import Optional
from loguru import logger
from oresat_configs import Mission, OreSatConfig
from ._internals.app import App, app
from ._internals.rest_api import RestAPI, render_olaf_template, rest_api
from ._internals.services.logs import logger_tmp_file_setup
from ._internals.updater import Updater, UpdaterState
from .board.adc import Adc
from .board.cpufreq import A8_CPUFREQS, get_cpufreq, get_cpufreq_gov, set_cpufreq, set_cpufreq_gov
from .board.eeprom import Eeprom
from .board.gpio import GPIO_HIGH, GPIO_IN, GPIO_LOW, GPIO_OUT, Gpio, GpioError
from .board.pru import Pru, PruError, PruState
from .canopen.ecss import scet_int_from_time, scet_int_to_time, utc_int_from_time, utc_int_to_time
from .canopen.master_node import MasterNode
from .canopen.network import CanNetwork, CanNetworkError, CanNetworkState, NetworkError
from .canopen.node import Node, NodeStop
from .common.daemon import Daemon, DaemonState
from .common.oresat_file import OreSatFile, new_oresat_file
from .common.oresat_file_cache import OreSatFileCache
from .common.resource import Resource
from .common.service import Service, ServiceState
try:
from ._version import version as __version__ # type: ignore
except ImportError:
__version__ = "0.0.0" # package is not installed
olaf_parser = ArgumentParser(prog="OLAF", add_help=False)
olaf_parser.add_argument("-b", "--bus", default="vcan0", help="CAN bus to use, defaults to vcan0")
olaf_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logging")
olaf_parser.add_argument("-l", "--log", action="store_true", help="log to only journald")
olaf_parser.add_argument(
"-m",
"--mock-hw",
nargs="*",
metavar="HW",
default=[],
help='list the hardware to mock or just "all" to mock all hardware',
)
olaf_parser.add_argument(
"-a", "--address", default="localhost", help="rest api address, defaults to localhost"
)
olaf_parser.add_argument(
"-p", "--port", type=int, default=8000, help="rest api port number, defaults to 8000"
)
olaf_parser.add_argument(
"-d",
"--disable-flight-mode",
action="store_true",
help="disable flight mode on start, defaults to flight mode enabled",
)
olaf_parser.add_argument(
"-o", "--oresat", default="oresat0.5", help="oresat mission; oresat0, oresat0.5, etc"
)
olaf_parser.add_argument(
"-w", "--hardware-version", default="0.0", help="override the hardware version"
)
olaf_parser.add_argument("-n", "--number", type=int, default=1, help="card number")
olaf_parser.add_argument(
"-t",
"--bus-type",
default="socketcan",
help=(
"can bus type; socketcan (default), socketcand, virtual, slcan, etc;"
"see https://python-can.readthedocs.io/en/stable/configuration.html#interface-names"
),
)
olaf_parser.add_argument(
"-H",
"--socketcand-host",
default="localhost",
help='host for socketcand bus (only used if bus_type is "socketcand")',
)
[docs]
def olaf_setup(name: str, args: Optional[Namespace] = None) -> tuple[Namespace, dict]:
"""
Parse runtime args and setup the app and REST API.
Parameters
----------
name: str
The card's node name.
args: Namespace
Optional runtime args. If not set, the default from `olaf_parser` will be used.
Returns
-------
Namespace
The runtime args.
dict
The OreSat configs.
"""
if args is None:
parser = ArgumentParser(parents=[olaf_parser])
args = parser.parse_args()
if args.verbose:
level = "DEBUG"
else:
level = "INFO"
logger.remove() # remove default logger
if args.log:
logger.add(SysLogHandler(address="/dev/log"), level=level, backtrace=True)
else:
logger.add(sys.stdout, level=level, backtrace=True)
logger_tmp_file_setup(level)
config = OreSatConfig(Mission.from_string(args.oresat))
if name not in config.cards:
name += f"_{args.number}"
if name not in config.cards:
raise ValueError(f"invalid card {name} for {args.oresat}")
od = config.od_db[name]
if args.disable_flight_mode:
od["flight_mode"].value = False
version = "0.0"
if not args.mock_hw:
try:
eeprom = Eeprom()
version = f"{eeprom.major}.{eeprom.minor}"
logger.info(f"detected v{version} card")
except (PermissionError, FileNotFoundError, OSError, UnicodeDecodeError):
logger.warning("could not read hardware info from eeprom")
if args.hardware_version != "0.0":
version = args.hardware_version
od["versions"]["hw_version"].value = version
is_octavo = config.cards[name].processor == "octavo"
if is_octavo:
od["versions"]["olaf_version"].value = __version__
network = CanNetwork(args.bus_type, args.bus, args.socketcand_host)
od_db = config.od_db if name == "c3" else None
app.setup(network, od, od_db, is_octavo)
rest_api.setup(address=args.address, port=args.port)
return args, config
[docs]
def olaf_run():
"""Start the app and REST API."""
rest_api.start()
app.run()
rest_api.stop()