"""The OLAF base Service class. A Resource with a dedicated thread."""
from enum import IntEnum
from threading import Event, Thread
from loguru import logger
from ..canopen.node import Node
[docs]
class ServiceState(IntEnum):
"""State a service can be in."""
STOPPED = 0
"""Service is not running."""
STARTING = 1
"""Service is starting up."""
RUNNING = 2
"""Service is running."""
STOPPING = 3
"""Service is stopping."""
FAILED = 3
"""Service has failed."""
[docs]
class Service:
"""
OLAF service, basically a :py:class:`Resource` with a dedicated thread.
All the ``on_*`` members can be overridden as needed.
"""
def __init__(self):
self.node = None
"""Node or MasterNode: The app's CANopen node. Set to None until start() is called."""
self._event = Event()
self._thread = Thread(target=self._loop)
self._status = ServiceState.STOPPED
self._error = False
def __del__(self):
if not self._event.is_set():
self._event.set()
if self._thread.is_alive():
self._thread.join()
[docs]
def start(self, node: Node):
"""
App will call this to start the service. This will call `self.on_start()` start a thread
that will call `self.on_loop()`.
"""
self._status = ServiceState.STARTING
logger.debug(f"starting service {self.__class__.__name__}")
self.node = node
try:
self.on_start()
self._thread.start()
except Exception as e: # pylint: disable=W0718
logger.exception(f"{self.__class__.__name__}'s on_start raised: {e}")
logger.critical(f"{self.__class__.__name__}'s thread is being skipped")
self._status = ServiceState.FAILED
[docs]
def on_start(self):
"""
Called when the service starts.
Should be used to add SDO read/write callbacks to app.
"""
pass
def _loop(self):
"""
Loop until a exception is thrown or the app stops.
"""
self._status = ServiceState.RUNNING
while not self._event.is_set():
try:
self.on_loop()
except Exception as e: # pylint: disable=W0718
logger.error("unexpected exception raised by on_loop, stopping service")
self.on_loop_error(e)
self._event.set()
self._error = True
self._status = ServiceState.STOPPING
[docs]
def on_loop(self):
"""
Called when in a while loop.
"""
self.sleep(1)
[docs]
def on_loop_error(self, error: Exception):
"""
Called when on_loop raises an exception before the thread stops.
Should be used to stop any hardware the service controls (if possible) and logs the errors.
"""
logger.exception(f"{self.__class__.__name__} on_loop raised: {error}")
[docs]
def stop(self):
"""
App will call this to stop the service when the app stops. This will call `self.on_stop()`.
"""
self._status = ServiceState.STOPPING
logger.debug(f"stopping service {self.__class__.__name__}")
try:
self.on_stop_before()
except Exception as e: # pylint: disable=W0718
logger.exception(f"{self.__class__.__name__}'s on_stop_before raised: {e}")
self._error = True
if not self._event.is_set():
self._event.set()
if self._thread.is_alive():
self._thread.join()
try:
self.on_stop()
except Exception as e: # pylint: disable=W0718
logger.exception(f"{self.__class__.__name__}'s on_stop raised: {e}")
self._error = True
self._status = ServiceState.FAILED if self._error else ServiceState.STOPPED
[docs]
def on_stop_before(self):
"""
Called when the app stops before the thread is stopped. Should be used to stop any blocking
calls in the thread loop.
Will be called reguardless if `self.on_loop_error()` was called or not when the app stops.
"""
pass
[docs]
def on_stop(self):
"""
Called when the app stops after the thread has stopped. Should be used to stop any hardware
the service controls.
Will be called reguardless if `self.on_loop_error()` was called or not when the app stops.
"""
pass
[docs]
def sleep(self, timeout: float):
"""
Sleep for X seconds, that can be interupted if `stop()` is called.
Parameters
----------
timeout: float
The time to sleep in seconds.
"""
self._event.wait(timeout)
[docs]
def sleep_ms(self, timeout: float):
"""
Sleep for X milliseconds, that can be interupted if `stop()` is called.
Parameters
----------
timeout: float
The time to sleep in milliseconds.
"""
self._event.wait(timeout / 1000)
[docs]
def cancel(self):
"""Cancel the service. Can be used to stop the service from `self.on_loop()`."""
self._event.set()
@property
def status(self) -> ServiceState:
"""ServiceState: Get the service state."""
return self._status