"""OreSat CANopen Node"""
import os
import struct
from enum import IntEnum
from pathlib import Path
from threading import Event
from time import monotonic
from typing import Any, Callable, Dict, Union
from canopen import LocalNode, ObjectDictionary
from canopen.objectdictionary import (
DOMAIN,
FLOAT_TYPES,
INTEGER_TYPES,
OCTET_STRING,
VISIBLE_STRING,
ODArray,
ODRecord,
ODVariable,
)
from loguru import logger
from ..canopen.network import CanNetwork, CanNetworkState
from ..common.daemon import Daemon
from ..common.oresat_file_cache import OreSatFileCache
from . import EmcyCode
[docs]
class NodeStop(IntEnum):
"""Node stop commands."""
NO_STOP = 0
"""Default value for no stop."""
SOFT_RESET = 1
"""Just stop the app and exit. Systemd will restart the app."""
HARD_RESET = 2
"""Reboot system after app has stopped"""
FACTORY_RESET = 3
"""Clear all file cachces and reboot system after app has stopped"""
POWER_OFF = 4
"""Just power off the system."""
[docs]
class Node:
"""
OreSat CANopen Node class
Jobs:
- It abstracts away the canopen.LocalNode and canopen.Network from Resources and Services.
- Provides access to the OD for Resources and Services.
- Lets Resources and Services send TPDOs.
- Lets Resources and Services send EMCY messages.
- Set up the file transfer caches.
- Starts/stops all Resources and Services.
- Sets up all timer-base TPDOs.
- Sets up all RPDO callbacks.
Basically it tries to abstract all the CANopen things as much a possible, while providing a
basic API for CANopen things.
"""
def __init__(self, network: CanNetwork, od: ObjectDictionary):
"""
Parameters
----------
network: CanNetwork
The CAN network
od: canopen.ObjectDictionary
The CANopen ObjectDictionary
"""
self._event = Event()
self._od = od
self._node: LocalNode = None
self._network: CanNetwork = network
self._read_cbs = {} # type: ignore
self._write_cbs = {} # type: ignore
self._syncs = 0
self._reset = NodeStop.SOFT_RESET
self._daemons = {} # type: ignore
if os.geteuid() == 0: # running as root
self.work_base_dir = "/var/lib/oresat"
self.cache_base_dir = "/var/cache/oresat"
else:
self.work_base_dir = str(Path.home()) + "/.oresat"
self.cache_base_dir = str(Path.home()) + "/.cache/oresat"
fread_path = self.cache_base_dir + "/fread"
fwrite_path = self.cache_base_dir + "/fwrite"
self._fread_cache = OreSatFileCache(fread_path)
self._fwrite_cache = OreSatFileCache(fwrite_path)
logger.debug(f"fread cache path {self._fread_cache.dir}")
logger.debug(f"fwrite cache path {self._fwrite_cache.dir}")
self._start_time = monotonic()
self._network.monitor()
self._first_network_reset = True
self._network.add_reset_callback(self._setup_node)
self._network.subscribe(0x80, self._on_sync)
self._rpdo_cobid_to_num: dict[int, int] = {}
for i in range(self._od.device_information.nr_of_RXPDO):
cob_id = self._od[0x1400 + i][1].value
self._rpdo_cobid_to_num[cob_id] = i
self._network.subscribe(cob_id, self._on_pdo)
def __del__(self):
# stop the monitor thread if it is running
if not self._event.is_set():
self.stop()
def _on_sync(self, cob_id: int, data: bytes, timestamp: float): # pylint: disable=W0613
"""On SYNC message send TPDOs configured to be SYNC-based"""
self._syncs += 1
if self._syncs == 241:
self._syncs = 1
for i in range(self.od.device_information.nr_of_TXPDO):
transmission_type = self.od[0x1800 + i][2].value
if self._syncs % transmission_type == 0:
self.send_tpdo(i)
def _on_pdo(self, cob_id: int, data: bytes, timestamp: float): # pylint: disable=W0613
rpdo = self._rpdo_cobid_to_num[cob_id]
maps = self.od[0x1600 + rpdo][0].value
offset = 0
for i in range(maps):
pdo_map = self.od[0x1600 + rpdo][i + 1].value
if pdo_map == 0:
break # nothing todo
pdo_map_bytes = pdo_map.to_bytes(4, "big")
index, subindex, size = struct.unpack(">HBB", pdo_map_bytes)
size //= 8
# call sdo callback(s) and convert data to bytes
if isinstance(self.od[index], ODVariable):
self._node.sdo[index].raw = data[offset : offset + size]
else: # record or array
self._node.sdo[index][subindex].raw = data[offset : offset + size]
offset += size
def _setup_node(self):
"""Create the CANopen node."""
if self._od.node_id is None:
self._od.node_id = 0x7C
self._node = LocalNode(self._od.node_id, self._od)
self._network.add_node(self._node)
self._node.nmt.state = "OPERATIONAL"
self._node.add_read_callback(self._on_sdo_read)
self._node.add_write_callback(self._on_sdo_write)
if not self._first_network_reset or monotonic() - self._start_time > 5:
self.send_emcy(0x8140)
else:
self._first_network_reset = False
def _destroy_node(self):
"""Destroy the CANopen node."""
self._node = None
[docs]
def run(self) -> NodeStop:
"""
Go into operational mode, start all the resources, start all the threads, and monitor
everything in a loop.
Returns
-------
NodeStop
Reset / power off condition.
"""
logger.info(f"{self.name} node is starting")
loops = -1
delay_ms = 100
delay = delay_ms / 1000
start_time = monotonic()
while not self._event.is_set():
loops += 1
self._event.wait(delay - ((monotonic() - start_time) % delay))
self._network.monitor()
if self._network.status != CanNetworkState.NETWORK_UP:
continue
# send heartbeat
event_time = self.od[0x1017].value
if loops % (event_time // delay_ms) == 0:
self._network.send_message(0x700 + self.od.node_id, b"\x05", False)
# send all timer-based TPDOs
for i in range(self._od.device_information.nr_of_TXPDO):
if i + 0x1800 not in self.od:
continue
transmission_type = self.od[0x1800 + i][2].value
event_time = self.od[0x1800 + i][5].value
if (
transmission_type in [0xFE, 0xFF]
and event_time != 0
and loops % (event_time // delay_ms) == 0
):
self.send_tpdo(i + 1)
self._destroy_node()
logger.info(f"{self.name} node has ended")
return self._reset
[docs]
def stop(self, reset: Union[NodeStop, None] = None):
"""End the run loop"""
if reset is not None:
self._reset = reset
self._event.set()
[docs]
def add_daemon(self, name: str):
"""Add a daemon for the node to monitor and/or control"""
self._daemons[name] = Daemon(name)
[docs]
def add_sdo_callbacks(
self,
index: str,
subindex: str,
read_cb: Callable[[None], Any],
write_cb: Callable[[Any], None],
):
"""
Add an SDO read callback for a variable at index and optional subindex.
Parameters
----------
index: int or str
The index to call the callback on.
subindex: int or str
The subindex to call the callback on.
read_cb: Callable[[None], Any]
The SDO read callback. Allows overriding the data being sent on a SDO read. If
overriding read data return the value or return :py:data:`None` to use the the value
from the od. Set to :py:data:`None` for no read_cb.
write_cb: Callable[[Any], None]
The SDO writecallback. Gives access to the data being received on a SDO write.
Set to :py:data:`None` for no write_cb.
**Note:** data is still written to object dictionary before call.
"""
try:
self.od[index]
except KeyError:
logger.warning(f"index {index} does not exist, ignoring request for new sdo callback")
return
if subindex:
try:
self.od[index][subindex]
except KeyError:
logger.warning(
f"subindex {subindex} for index {index} does not exist, ignoring request for "
"new sdo callback"
)
return
if read_cb is not None:
self._read_cbs[index, subindex] = read_cb
if write_cb is not None:
self._write_cbs[index, subindex] = write_cb
def _send_pdo(self, comm_index: int, map_index: int, raise_error: bool = True):
"""Send a PDO. Will not be sent if not node is not in operational state."""
# PDOs should not be sent if CANopen node not in 'OPERATIONAL' state
if self._node.nmt.state != "OPERATIONAL":
return
cob_id = self.od[comm_index][1].value & 0x3F_FF_FF_FF
maps = self.od[map_index][0].value
data = b""
for i in range(maps):
pdo_map = self.od[map_index][i + 1].value
if pdo_map == 0:
break # nothing todo
pdo_map_bytes = pdo_map.to_bytes(4, "big")
index, subindex, _ = struct.unpack(">HBB", pdo_map_bytes)
# call sdo callback(s) and convert data to bytes
if isinstance(self.od[index], ODVariable):
value = self._node.sdo[index].phys
value_bytes = self.od[index].encode_raw(value)
else: # record or array
value = self._node.sdo[index][subindex].phys
value_bytes = self.od[index][subindex].encode_raw(value)
# pack pdo with bytes
data += value_bytes
if len(data) > 8:
self.send_emcy(EmcyCode.PROTOCOL_PDO_LEN_EXCEEDED, b"", False)
return
self._network.send_message(cob_id, data, raise_error)
[docs]
def send_tpdo(self, tpdo: int, raise_error: bool = True):
"""
Send a TPDO. Will not be sent if not node is not in operational state.
Parameters
----------
tpdo: int
TPDO number to send, should be between 1 and 16.
raise_error: bool
Set to False to not raise NetworkError.
Raises
------
NetworkError
Cannot send a TPDO message when the network is down.
"""
if tpdo < 1:
raise ValueError("TPDO number must be greater than 1")
tpdo -= 1 # number to offset
comm_index = 0x1800 + tpdo
map_index = 0x1A00 + tpdo
self._send_pdo(comm_index, map_index, raise_error)
[docs]
def send_emcy(self, code: Union[EmcyCode, int], data: bytes = b"", raise_error: bool = True):
"""
Send a EMCY message.
Parameters
----------
code: Emcy, int
The EMCY code.
data: bytes
Optional data to add to the message (up to 5 bytes).
raise_error: bool
Set to False to not raise NetworkError.
Raises
------
NetworkError
Cannot send a EMCY message when the network is down.
"""
if isinstance(code, EmcyCode):
code = code.value
if len(data) > 5:
raise ValueError("data must be 5 or less bytes")
frame = code.to_bytes(2, "little") + self.od[0x1001].value.to_bytes(1, "little") + data
frame += b"\x00" * (5 - len(data))
self._network.send_message(self.od.node_id + 0x80, frame, raise_error)
logger.error(f"sent emcy 0x{code:04X} {data.hex()}")
def _on_sdo_read(self, index: int, subindex: int, od: ODVariable):
"""
SDO read callback function. Allows overriding the data being sent on a SDO read. Return
valid datatype for object, if overriding read data, or :py:data:`None` to use the the value
on object dictionary.
Parameters
----------
index: int
The index the SDO is reading to.
subindex: int
The subindex the SDO is reading to.
od: canopen.objectdictionary.ODVariable
The variable object being read to. Badly named. And not appart of the actual OD.
Returns
-------
Any
The value to return for that index / subindex.
"""
ret = None
# convert any ints to strs
if isinstance(self.od[index], ODVariable) and od == self.od[index]:
index = od.name
subindex = None # type: ignore
else:
index = self.od[index].name
subindex = od.name
if (index, subindex) in self._read_cbs:
ret = self._read_cbs[index, subindex]()
# get value from OD
if ret is None:
ret = od.value
return ret
def _on_sdo_write(self, index: int, subindex: int, od: ODVariable, data: bytes):
"""
SDO write callback function. Gives access to the data being received on a SDO write.
*Note:* data is still written to object dictionary before call.
Parameters
----------
index: int
The index the SDO being written to.
subindex: int
The subindex the SDO being written to.
od: canopen.objectdictionary.ODVariable
The variable object being written to. Badly named.
data: bytes
The raw data being written.
"""
binary_types = [DOMAIN, OCTET_STRING]
# set value in OD before callback
if od.data_type in binary_types:
od.value = data
else:
od.value = od.decode_raw(data)
# convert any ints to strs
if isinstance(self.od[index], ODVariable) and od == self.od[index]:
index = od.name
subindex = None # type: ignore
else:
index = self.od[index].name
subindex = od.name
if (index, subindex) in self._write_cbs:
self._write_cbs[index, subindex](od.value)
@property
def bus(self) -> str:
"""str: The CAN bus."""
return self._network.channel
@property
def bus_state(self) -> str:
"""str: The CAN bus status."""
return self._network.status.name
@property
def name(self) -> str:
"""str: The nodes name."""
return self._od.device_information.product_name
@property
def od(self) -> ObjectDictionary:
"""canopen.ObjectDictionary: Access to the object dictionary."""
return self._od
@property
def fread_cache(self) -> OreSatFileCache:
"""OreSatFile: Cache the CANopen master node can read to."""
return self._fread_cache
@property
def fwrite_cache(self) -> OreSatFileCache:
"""OreSatFile: Cache the CANopen master node can write to."""
return self._fwrite_cache
@property
def is_running(self) -> bool:
"""bool: Is the node loop running"""
return not self._event.is_set()
@property
def daemons(self) -> Dict[str, Daemon]:
"""dict: The dictionary of external daemons that are monitored and/or controllable"""
return self._daemons
[docs]
def od_get_obj(
self, index: Union[int, str], subindex: Union[int, str, None] = None
) -> Union[ODVariable, ODArray, ODRecord]:
"""
Quick helper function to get an object from the od.
Returns
-------
ODVariable | ODArray | ODRecord
The object from the OD.
"""
if subindex is None:
return self._od[index]
return self._od[index][subindex]
[docs]
def od_read(
self, index: Union[int, str], subindex: Union[int, str, None]
) -> Union[int, str, float, bytes, bool]:
"""
Read a value from the OD.
Parameters
----------
index: int or str
The index to read from.
subindex: int, str, or None
The subindex to read from or None.
Returns
-------
int | str | float | bytes | bool
The value read.
"""
obj = self.od_get_obj(index, subindex)
return obj.value
[docs]
def od_read_bitfield(
self, index: Union[int, str], subindex: Union[int, str, None], field: str
) -> int:
"""
Read a field from a object from the OD.
Parameters
----------
index: int or str
The index to read from.
subindex: int, str, or None
The subindex to read from or None.
Returns
-------
int:
The field value.
"""
obj = self.od_get_obj(index, subindex)
bits = obj.bit_definitions[field]
value = 0
for i in bits:
tmp = obj.value & (1 << bits[i])
value |= tmp >> bits[i]
return value
[docs]
def od_read_enum(self, index: Union[int, str], subindex: Union[int, str, None]) -> str:
"""
Read a enum str from the OD.
Parameters
----------
index: int or str
The index to read from.
subindex: int, str, or None
The subindex to read from or None.
Returns
-------
str
The enum str value.
"""
obj = self.od_get_obj(index, subindex)
return obj.value_descriptions[obj.value]
def _var_write(self, obj: ODVariable, value: Union[int, str, float, bytes, bool]):
value_type = type(value)
def make_error_value(data_type) -> str:
return f"cannot write {value!r} ({data_type}) to object {obj.name} ({obj.data_type})"
if obj.data_type in INTEGER_TYPES:
if value_type != int:
raise TypeError(make_error_value("int"))
if obj.max is not None and value > obj.max:
raise ValueError(f"value {value!r} too high (high limit {obj.max})")
if obj.min is not None and value < obj.min:
raise ValueError(f"value {value!r} too low (low limit {obj.min})")
elif obj.data_type in FLOAT_TYPES:
if not isinstance(value, int):
raise TypeError(make_error_value("float"))
if obj.max is not None and value > obj.max:
raise ValueError(f"value {value!r} too high (high limit {obj.max})")
if obj.min is not None and value < obj.min:
raise ValueError(f"value {value!r} too low (low limit {obj.min})")
elif obj.data_type == VISIBLE_STRING and not isinstance(value, str):
raise TypeError(make_error_value("str"))
elif obj.data_type == OCTET_STRING and not isinstance(value, bytes):
raise TypeError(make_error_value("bytes"))
obj.value = value
[docs]
def od_write(
self,
index: Union[int, str],
subindex: Union[int, str, None],
value: Union[int, str, float, bytes, bool],
):
"""
Write an value to the OD.
Parameters
----------
index: int | str
The index to read from.
subindex: int | str | None
The subindex to read from or None.
value: int | str | float | bytes | bool
The value to write.
Raises
------
ValueError
An invalid value.
"""
obj = self.od_get_obj(index, subindex)
self._var_write(obj, value)
[docs]
def od_write_bitfield(
self, index: Union[int, str], subindex: Union[int, str, None], field: str, value: int
):
"""
Write a bit field value to a object to the OD.
Parameters
----------
index: int | str
The index to read from.
subindex: int | str | None
The subindex to read from or None.
field: str
Name of field to write to.
value: int
The value to write.
Raises
------
ValueError
An invalid value.
"""
obj = self.od_get_obj(index, subindex)
bits = obj.bit_definitions[field]
offset = min(bits)
mask = 0
for i in bits:
mask |= 1 << bits[i]
new_value = obj.value
new_value ^= mask
new_value |= value << offset
obj.value = new_value
[docs]
def od_write_enum(self, index: Union[int, str], subindex: Union[int, str, None], value: str):
"""
Write a enum str to the OD.
Parameters
----------
index: int | str
The index to read from.
subindex: int | str | None
The subindex to read from or None.
value: str
The enum string to write.
"""
obj = self.od_get_obj(index, subindex)
tmp = {d: v for v, d in obj.value_descriptions}
obj.value = tmp[value.lower()]