Source code for olaf.canopen.master_node

"""OreSat CANopen Master Node class to support the C3"""

from collections import namedtuple
from time import monotonic
from typing import Any, Dict, Union

import canopen
from canopen.sdo import SdoArray, SdoRecord, SdoVariable
from loguru import logger

from ..canopen.network import CanNetwork
from .node import Node

NodeHeartbeatInfo = namedtuple("NodeHeartbeatInfo", ["state", "timestamp", "time_since_boot"])


[docs] class MasterNode(Node): """OreSat CANopen Master Node (only used by the C3)""" def __init__( self, network: CanNetwork, od: canopen.ObjectDictionary, od_db: Dict[Any, canopen.ObjectDictionary], ): """ Parameters ---------- network: CanNetwork The CAN network od: canopen.ObjectDictionary The CANopen ObjectDictionary od_db: Dict[Any, canopen.ObjectDictionary] Database of other nodes's ODs. The dict key will be used by class fields and methods. """ super().__init__(network, od) self._od_db = od_db self.network = network self._node_id_to_key = {od.node_id: key for key, od in od_db.items()} self._remote_nodes = {} self.node_status = {} for k, v in self._od_db.items(): if v == od: continue # skip itself self._remote_nodes[k] = canopen.RemoteNode(v.node_id, v) self.node_status[k] = NodeHeartbeatInfo( 0xFF, 0.0, 0.0, ) # 0xFF is a flag, not a CANopen standard self._network.subscribe(0x80 + v.node_id, self._on_emergency) self._network.subscribe(0x700 + v.node_id, self._on_heartbeat) self._network.add_reset_callback(self._restart_network) def _restart_network(self): """Restart the CANopen network""" for key, od in self._od_db.items(): if od == self._od: continue # skip itself self.node_status[key] = NodeHeartbeatInfo(0xFF, 0.0, 0.0) for remote_node in self._remote_nodes.values(): self._network.add_node(remote_node) def _on_heartbeat(self, cob_id: int, data: bytes, timestamp: float): """Callback on node hearbeat messages.""" node_id = cob_id - 0x700 status = int.from_bytes(data, "little") key = self._node_id_to_key[node_id] self.node_status[key] = NodeHeartbeatInfo(status, timestamp, monotonic()) def _on_emergency(self, cob_id: int, data: bytes, timestamp: float): # pylint: disable=W0613 """Callback on node emergency messages.""" node_id = cob_id - 0x80 value_str = data.hex(sep=" ") logger.error(f"node {node_id:02X} raised emergency: {value_str}")
[docs] def send_sync(self): """ Send a CANopen SYNC message. """ self._network.send_message(0x80, b"", False)
@property def remote_nodes(self) -> dict[Any, canopen.RemoteNode]: """dict[Any, canopen.RemoteNode]: All other node as remote node.""" return self._remote_nodes @property def od_db(self) -> dict[Any, canopen.ObjectDictionary]: """dict[Any, canopen.ObjectDictionary]: All other node ODs.""" return self._od_db def _sdo_get_obj( self, key: Any, index: Union[int, str], subindex: Union[int, str, None] ) -> [SdoVariable, SdoArray, SdoRecord]: if subindex is None: return self._remote_nodes[key].sdo[index] return self._remote_nodes[key].sdo[index][subindex]
[docs] def sdo_read( self, key: Any, index: Union[int, str], subindex: Union[int, str, None] ) -> Union[int, str, float, bytes, bool]: """ Read a value from a remote node's object dictionary using an SDO. Parameters ---------- key: Any The dict key for the node to read from. index: int | str The index to read from. subindex: int | str | None The subindex to read from or None. Raises ------ NetworkError Cannot send a SDO read message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. Returns ------- int | str | float | bytes | bool The value read. """ return self._sdo_get_obj(key, index, subindex).phys
[docs] def sdo_read_bitfield( self, key: Any, index: Union[int, str], subindex: Union[int, str, None], field: str ) -> int: """ Read an field from a object from another node's OD using a SDO. Parameters ---------- key: Any The dict key for the node to read from. index: int | str The index to read from. subindex: int | str | None The subindex to read from or None. Raises ------ NetworkError Cannot send a SDO read message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. Returns ------- int The field value. """ obj = self._sdo_get_obj(key, index, subindex) bits = obj.od.bit_definitions[field] value = 0 obj_value = obj.phys for i in bits: tmp = obj_value & (1 << bits[i]) value |= tmp >> bits[i] return value
[docs] def sdo_read_enum( self, key: Any, index: Union[int, str], subindex: Union[int, str, None] ) -> str: """ Read an enum str from another node's OD using a SDO. Parameters ---------- key: Any The dict key for the node to read from. index: int | str The index to read from. subindex: int | str | None The subindex to read from or None. Raises ------ NetworkError Cannot send a SDO read message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. Returns ------- str The enum str value. """ obj = self._sdo_get_obj(key, index, subindex) obj_value = obj.phys return obj.od.value_descriptions[obj_value]
[docs] def sdo_write( self, key: Any, index: Union[int, str], subindex: Union[int, str, None], value: Union[int, str, float, bytes, bool], ): """ Write a value to a remote node's object dictionary using an SDO. Parameters ---------- key: Any The dict key for the node to write to. index: int | int The index to write to. subindex: int | str | None The subindex to write to or None. value: int | str | float | bytes | bool The value to write. Raises ------ NetworkError Cannot send a SDO write message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. """ obj = self._sdo_get_obj(key, index, subindex) obj.phys = value
[docs] def sdo_write_bitfield( self, key: Any, index: Union[int, str], subindex: Union[int, str, None], field: str, value: int, ): # pylint: disable=R0917 """ Write a field from a object to another node's OD using a SDO. Parameters ---------- key: Any The dict key for the node to write to. index: int | int The index to write to. subindex: int | str | None The subindex to write to or None. field: str Name of field to write to. value: int The value to write. Raises ------ NetworkError Cannot send a SDO read message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. """ obj = self._sdo_get_obj(key, index, subindex) bits = obj.od.bit_definitions[field] offset = min(bits) mask = 0 for i in bits: mask |= 1 << bits[i] new_value = obj.phys new_value ^= mask new_value |= value << offset obj.phys = new_value
[docs] def sdo_write_enum( self, key: Any, index: Union[int, str], subindex: Union[int, str, None], value: str ): """ Write a enum str to another node's OD using a SDO. Parameters ---------- key: Any The dict key for the node to write to. index: int | int The index to write to. subindex: int | str | None The subindex to write to or None. value: str The enum str value to write. Raises ------ NetworkError Cannot send a SDO read message when the network is down. canopen.sdo.exceptions.SdoError Error with the SDO. """ obj = self._sdo_get_obj(key, index, subindex) tmp = {d: v for v, d in obj.od.value_descriptions} obj.phys = tmp[value]
[docs] def send_rpdo(self, rpdo: int, raise_error: bool = True): """ Send a RPDO. Will not be sent if not node is not in operational state. Parameters ---------- rpdo: int RPDO number to send, should be between 1 and 16. raise_error: bool Set to False to not raise NetworkError. Raises ------ ValueError Invalud rpdo value. NetworkError Cannot send a RPDO read message when the network is down. """ if rpdo < 1: raise ValueError("RPDO number must be greater than 1") rpdo -= 1 # number to offset comm_index = 0x1400 + rpdo map_index = 0x1600 + rpdo self._send_pdo(comm_index, map_index, raise_error)