Source code for olaf._internals.rest_api

"""OLAF REST API for testing and integration."""

import base64
import logging
import os
import shutil
from pathlib import Path
from threading import Thread
from typing import Union

import canopen
from flask import Flask, jsonify, render_template, request, send_from_directory
from loguru import logger
from oresat_configs import Mission
from werkzeug.serving import make_server

from ...common import natsorted
from ..app import app

DATA_TYPE_NAMES = {
    canopen.objectdictionary.BOOLEAN: "BOOLEAN",
    canopen.objectdictionary.INTEGER8: "INTEGER8",
    canopen.objectdictionary.INTEGER16: "INTEGER16",
    canopen.objectdictionary.INTEGER32: "INTEGER32",
    canopen.objectdictionary.INTEGER64: "INTEGER64",
    canopen.objectdictionary.UNSIGNED8: "UNSIGNED8",
    canopen.objectdictionary.UNSIGNED16: "UNSIGNED16",
    canopen.objectdictionary.UNSIGNED32: "UNSIGNED32",
    canopen.objectdictionary.UNSIGNED64: "UNSIGNED64",
    canopen.objectdictionary.REAL32: "REAL32",
    canopen.objectdictionary.REAL64: "REAL64",
    canopen.objectdictionary.VISIBLE_STRING: "VISIBLE_STRING",
    canopen.objectdictionary.OCTET_STRING: "OCTET_STRING",
    canopen.objectdictionary.DOMAIN: "DOMAIN",
}

BYTES_TYPES = [
    canopen.objectdictionary.OCTET_STRING,
    canopen.objectdictionary.DOMAIN,
]


[docs] class RestAPI: """ An optional Flask app for reading and writing values into the OD. Use the global ``olaf.rest_api`` object. """ _PATH = os.path.dirname(os.path.abspath(__file__)) _TEMPLATE_DIR = "/tmp/oresat/templates" def __init__(self): self.app = Flask( "OLAF", template_folder=self._TEMPLATE_DIR, static_folder=f"{self._PATH}/static" ) logging.getLogger("werkzeug").setLevel(logging.ERROR) self._thread = Thread(target=self._run) self._server = None self._ctx = None Path(self._TEMPLATE_DIR).mkdir(parents=True, exist_ok=True) # add all core templates for i in os.listdir(f"{self._PATH}/templates"): self.add_template(f"{self._PATH}/templates/{i}")
[docs] def setup(self, address: str, port: int): """Setup the REST API thread""" self._server = make_server(address, port, self.app)
[docs] def start(self): """Start the REST API thread""" logger.info("starting rest api") self._ctx = self.app.app_context() self._ctx.push() self._thread.start()
def _run(self): self._server.serve_forever()
[docs] def stop(self): """Stop the REST API thread""" logger.info("stopping rest api") if self._server is not None: self._server.shutdown() if self._thread.is_alive(): self._thread.join()
[docs] def add_template(self, template_path: str): """ Add a Flask template to common templates dir. All templates must be in the same directory. Parameters ---------- template_path: str Path to the template file to add. """ shutil.copy(template_path, self._TEMPLATE_DIR)
[docs] def render_olaf_template(template: str, name: str): """ Render a standard OLAF template. Parameters ---------- template: str Template file name. name: str Nice name for the template. """ sat = Mission.from_id(app.od["satellite_id"].value) title = f"{sat} {app.od.device_information.product_name}" return render_template(template, title=title, name=name)
def make_error_json(error: Union[str, Exception]) -> str: """ Make the stand error json message for the REST API Parameters ---------- error: str, Exception The error message Returns ------- str The JSON error message """ return jsonify({"error": str(error)}) rest_api = RestAPI() """The global instance of the REST API.""" @rest_api.app.route("/") def root(): """Render the root template.""" routes = [] for rule in rest_api.app.url_map.iter_rules(): route = str(rule) if ( not route.startswith("/static/") and not route.startswith("/od/") and route not in ["/", "/favicon.ico", "/od-all", "/bus"] ): routes.append(str(rule)) routes = natsorted(routes) sat = Mission.from_id(app.od["satellite_id"].value) title = f"{sat} {app.od.device_information.product_name}" return render_template("root.html", title=title, routes=routes) @rest_api.app.route("/favicon.ico") def favicon(): """Pass the favicon.icon.""" path = os.path.dirname(os.path.abspath(__file__)) return send_from_directory(f"{path}/static", "favicon.ico") def _json_value_to_value(data_type: int, json_value): """Convert JSON value to real OD value to bytes for SDO callback""" value = json_value if data_type == canopen.objectdictionary.BOOLEAN and not isinstance(json_value, bool): value = json_value.lower() == "true" elif data_type in canopen.objectdictionary.INTEGER_TYPES and not isinstance(json_value, int): value = int(json_value, 16) if json_value.startswith("0x") else int(json_value) elif data_type in canopen.objectdictionary.FLOAT_TYPES: value = float(json_value) elif data_type in BYTES_TYPES: try: value = base64.decodebytes(json_value.encode("utf-8")) except AttributeError: pass return value @rest_api.app.route("/bus", methods=["GET"]) def can_bus(): """Get CAN bus info.""" return jsonify( { "channel": app.node.bus, "bitrate": app.od.bitrate // 1000, # bps -> kpbs "status": app.node.bus_state, } ) @rest_api.app.route("/od/<index>/", methods=["GET", "PUT"]) def od_index_old(index: str): """Read or write a value from OD with only a index. For backward compactability.""" return od_index(index) @rest_api.app.route("/od/<index>/<subindex>/", methods=["GET", "PUT"]) def od_subindex_old(index: str, subindex: str): """Read or write a value from OD. For backward compactability.""" return od_subindex(index, subindex) @rest_api.app.route("/od/<index>", methods=["GET", "PUT"]) def od_index(index: str): """Read or write a value from OD with only a index.""" try: if index.startswith("0x"): index = int(index, 16) # type: ignore elif index[0] in "0123456789": index = int(index) # type: ignore except ValueError: return make_error_json(f"invalid index {index}") try: obj = app.od[index] except KeyError: index_name = f"0x{index:X}" if isinstance(index, int) else index msg = f"no object at index {index_name}" logger.error(f"REST API error: {msg}") return make_error_json(msg) if request.method == "PUT": try: json_value = request.json["value"] # convert value from JSON to bytes for SDO callback value = _json_value_to_value(obj.data_type, json_value) raw = obj.encode_raw(value) app.node._on_sdo_write(index, None, obj, raw) # pylint: disable=W0212 except Exception as e: # pylint: disable=W0718 logger.error(f"REST API error: {e}") return make_error_json(str(e)) return jsonify(_object_to_dict(index)) @rest_api.app.route("/od/<index>/<subindex>", methods=["GET", "PUT"]) def od_subindex(index: str, subindex: str): """Read or write a value from OD.""" try: if index.startswith("0x"): index = int(index, 16) # type: ignore elif index[0] in "0123456789": index = int(index) # type: ignore if subindex.startswith("0x"): subindex = int(subindex, 16) # type: ignore elif subindex[0] in "0123456789": subindex = int(subindex) # type: ignore except ValueError: return make_error_json(f"invalid index {index} or subindex {subindex}") try: obj = app.od[index][subindex] except (KeyError, TypeError): index_name = f"0x{index:X}" if isinstance(index, int) else index subindex_name = f"0x{subindex:X}" if isinstance(subindex, int) else subindex msg = f"no object at index {index_name} subindex {subindex_name}" logger.error(f"REST API error: {msg}") return make_error_json(msg) if request.method == "PUT": try: json_value = request.json["value"] # convert value from JSON to bytes for SDO callback value = _json_value_to_value(obj.data_type, json_value) if obj.data_type in BYTES_TYPES: raw = value else: raw = obj.encode_raw(value) app.node._on_sdo_write(index, subindex, obj, raw) # pylint: disable=W0212 except Exception as e: # pylint: disable=W0718 logger.exception(f"REST API error: {e}") return make_error_json(str(e)) return jsonify(_object_to_dict(index, subindex)) def _object_to_dict( index: Union[str, int], subindex: Union[str, int, None] = None, add_values: bool = True, ) -> dict: """ Convert a OD object to a dictionary. Parameters ---------- index: str, int The index of the object to convert. subindex: str, int Optional subindex of the object to convert. add_values: bool Add values (current and engineering value) to dict. Returns ------- dict The object as a dictionary. """ if subindex is None: try: obj = app.node.od[index] except KeyError: msg = f"0x{index:04X} is not a valid index" logger.debug("REST API error: " + msg) raise KeyError(msg) else: try: obj = app.node.od[index][subindex] except KeyError: msg = f"0x{subindex:02X} not a valid subindex for index 0x{index:04X}" logger.debug("REST API error: " + msg) raise KeyError(msg) if isinstance(obj, canopen.objectdictionary.Variable): value = app.node._on_sdo_read(index, subindex, obj) # pylint: disable=W0212 if obj.data_type in BYTES_TYPES and value is not None: # encode bytes data types for JSON try: value = base64.encodebytes(value).decode("utf-8") except TypeError: logger.error(f"object {obj.name} does not have bytes-like data") data = { "name": obj.name, "index": obj.index, "description": obj.description, } if isinstance(obj, canopen.objectdictionary.Variable): data["object_type"] = "VARIABLE" data["access_type"] = obj.access_type data["data_type"] = DATA_TYPE_NAMES[obj.data_type] if add_values: data["value"] = value data["eng_value"] = value # always include, even when the same as value if obj.data_type in canopen.objectdictionary.INTEGER_TYPES: data["bit_definitions"] = obj.bit_definitions data["value_descriptions"] = obj.value_descriptions data["scale_factor"] = obj.factor if add_values: data["eng_value"] = obj.factor * value data["subindex"] = obj.subindex data["unit"] = obj.unit data["low_limit"] = obj.min or "" data["high_limit"] = obj.max or "" elif isinstance(obj, canopen.objectdictionary.Array): data["object_type"] = "ARRAY" data["subindexes"] = {sub: _object_to_dict(index, sub, add_values) for sub in obj} else: data["object_type"] = "RECORD" data["subindexes"] = {sub: _object_to_dict(index, sub, add_values) for sub in obj} return data @rest_api.app.route("/od-all", methods=["GET"]) def get_all_object(): """Get all object data as a one giant JSON.""" data = {} for index in app.od: if index < 0x3000: continue data[index] = _object_to_dict(index, None, False) return data @rest_api.app.route("/od") def od_template(): """Render the OD template.""" return render_olaf_template("od.html", name="Object Dictionary") @rest_api.app.route("/os-command") def os_command_template(): """Render the OS command template.""" return render_olaf_template("os_command.html", name="OS Command") @rest_api.app.route("/updater") def updater_template(): """Render the updater template.""" return render_olaf_template("updater.html", name="Updater") @rest_api.app.route("/fwrite") def fwrite_template(): """Render the fwrite cache template.""" return render_olaf_template("fwrite.html", name="Fwrite Cache") @rest_api.app.route("/fread") def fread_template(): """Render the fread cache template.""" return render_olaf_template("fread.html", name="Fread Cache") @rest_api.app.route("/logs") def logs_template(): """Render the logs template.""" return render_olaf_template("logs.html", name="Logs") @rest_api.app.route("/reset") def reset_template(): """Render the reset template.""" return render_olaf_template("reset.html", name="Reset")