"""OreSat Linux updater"""
import json
import subprocess
import tarfile
from enum import IntEnum
from os import listdir, remove
from os.path import abspath, basename, isfile
from pathlib import Path
from shutil import rmtree
from loguru import logger
from ..common.oresat_file import OreSatFile, new_oresat_file
from ..common.oresat_file_cache import OreSatFileCache
INSTRUCTIONS_FILE = "instructions.txt"
"""The instructions file that is always in a OreSat Linux update archive. It
defines the order instructions are ran in and how it is ran."""
INSTRUCTIONS = {
"BASH_SCRIPT": "bash",
"DPKG_INSTALL": "dpkg -i",
"DPKG_REMOVE": "dpkg -r",
"DPKG_PURGE": "dpkg -P",
"PIP_INSTALL": "python3 -m pip install",
"PIP_UNINSTALL": "python3 -m pip uninstall",
}
"""All the valid instruction. Values are the commands."""
INSTRUCTIONS_WITH_FILES = [
"BASH_SCRIPT",
"DPKG_INSTALL",
"PIP_INSTALL",
]
"""The list of instructions from INSTRUCTIONS that require file(s)."""
OLU_STATUS_KEYWORD = "olu-status"
DPKG_STATUS_KEYWORD = "dpkg-status"
PIP_STATUS_KEYWORD = "pip-status"
class UpdaterError(Exception):
"""An error occurred in Updater class."""
[docs]
class UpdaterState(IntEnum):
"""The integer value Updater's update() will return"""
UPDATE_SUCCESSFUL = 0x0
"""The last update was successfully installed. Default State."""
PRE_UPDATE_FAILED = 0x1
"""The last update failed during the inital non critical section. Either the was an error using
the file cache, when opening tarfile, or reading the instructions file."""
UPDATE_FAILED = 0x2
"""The update failed during the critical section. The updater fail while following the
instructions."""
UPDATING = 0xFF
"""Updater is updating"""
[docs]
class Updater:
"""The OreSat Linux updater. Allows OreSat Linux boards to be update thru update archives.
While this could be replaced with a couple of functions. Having a object with properties, allow
for easy to get status info while updating.
"""
def __init__(self, work_dir: str, cache_dir: str):
"""
Parameters
----------
work_dir: str
Directory to use a the working dir. Should be a abslute path.
cache_dir: str
Directory to store update archives in. Should be a abslute path.
"""
# make update_archives for cache dir
Path(cache_dir).mkdir(parents=True, exist_ok=True)
self._cache_dir = abspath(cache_dir)
logger.debug(f"updater cache dir {self._cache_dir}")
# make update_archives for work dir
Path(work_dir).mkdir(parents=True, exist_ok=True)
self._work_dir = abspath(work_dir)
logger.debug(f"updater work dir {self._work_dir}")
self._cache = OreSatFileCache(cache_dir)
self._state = UpdaterState.UPDATE_SUCCESSFUL
self._update_archive = ""
self._total_instructions = 0
self._instruction_index = 0
self._instruction_percent = 0
self._command = ""
self._has_dpkg = isfile("/usr/bin/dpkg")
if not self._has_dpkg:
logger.warning("dpkg is not installed, updates will not start")
def _clear_work_dir(self):
"""Clear the working directory."""
logger.info("clearing working directory")
rmtree(self._work_dir, ignore_errors=True)
Path(self._work_dir).mkdir(parents=True, exist_ok=True)
[docs]
def add_update_archive(self, file_path: str) -> bool:
"""Copies update archive into the update archive cache.
Parameters
----------
file_path: str
The absolute path to update archive for the updater to copy.
Returns
-------
bool
True if a file was added or False on failure.
"""
ret = True
file_path = abspath(file_path) # make sure it is a absolute path
file_name = basename(file_path)
try:
self._cache.add(file_path)
except FileNotFoundError:
logger.error(f"{file_name} is a invalid file")
ret = False
return ret
[docs]
def update(self):
"""Run a update.
If there are file aleady in the working directory, it will try to find
and resume the update, otherwise it will get the oldest archive from
the update archive cache and run it.
If the update fails, the cache will be cleared, as it is asume all
newer updates require the failed updated to be run successfully first.
Raises
------
UpdaterError
A error occurred when updating.
"""
if self._state == UpdaterState.UPDATING:
raise UpdaterError("can't start an new update while already updating")
if not self._has_dpkg:
raise UpdaterError("cannot run a update, missing dpkg")
update_archive_file_path = ""
self._state = UpdaterState.UPDATING
self._update_archive = ""
self._total_instructions = 0
self._instruction_index = 0
self._instruction_percent = 0
self._command = ""
# something in working dir, see if it an update to resume
file_list = listdir(self._work_dir)
if len(file_list) != 0:
logger.info("files found in working dir")
# find update archive in work directory
for file_name in file_list:
if is_update_archive(file_name):
self._update_archive = file_name
update_archive_file_path = self._work_dir + "/" + file_name
logger.info(f"resuming update with {file_name}")
break
if update_archive_file_path == "": # Nothing to resume
logger.info("nothing to resume")
self._clear_work_dir()
# if not resuming, get new update archive from cache
if update_archive_file_path == "" and len(self._cache) != 0:
update_archive_file_path = self._cache.pop(self._work_dir)
self._update_archive = basename(update_archive_file_path)
logger.info(f"got {self._update_archive} from cache")
if update_archive_file_path == "": # nothing to do
logger.info("no update to resume or in cache")
self._state = UpdaterState.UPDATE_SUCCESSFUL
return
logger.info("extracting files from update")
try:
self._extract_update_archive(update_archive_file_path)
except Exception as e: # pylint: disable=W0718
logger.exception(e)
self._clear_work_dir()
self._state = UpdaterState.PRE_UPDATE_FAILED
return
logger.info("reading instructions file")
try:
commands = self._read_instructions()
except Exception as e: # pylint: disable=W0718
logger.exception(e)
self._clear_work_dir()
self._state = UpdaterState.PRE_UPDATE_FAILED
return
logger.info("running instructions")
try:
# No turn back point, the update is starting!!!
# If anything fails/errors the board's software could break.
# All errors are log at critical level.
self._run_instructions(commands)
except Exception as e: # pylint: disable=W0718
logger.exception(e)
self._clear_work_dir()
self._cache.clear()
self._state = UpdaterState.UPDATE_FAILED
return
logger.info(f"update {self._update_archive} was successful")
self._clear_work_dir()
self._update_archive = ""
self._state = UpdaterState.UPDATE_SUCCESSFUL
def _extract_update_archive(self, file_path: str) -> str:
"""Open the update archive file.
Parameters
----------
file_path: str
Path to the update archive.
Raises
------
UpdaterError
Invalid update archive.
Returns
-------
str
The contents of the instructions file.
"""
file_name = basename(file_path)
if not is_update_archive(file_path):
raise UpdaterError(file_name + " does not follow OreSat file name standards")
try:
with tarfile.open(file_path, "r:xz") as t:
t.extractall(self._work_dir)
except tarfile.TarError:
raise UpdaterError(file_name + " is a invalid .tar.xz")
instructions_file_path = self._work_dir + "/" + INSTRUCTIONS_FILE
if not isfile(instructions_file_path):
raise UpdaterError(file_name + " is missing an instructions file")
return instructions_file_path
def _read_instructions(self) -> list:
"""Read the instructions file, validates the instructions, and makes the commands.
Parameters
----------
instruction: str
path to the instructions file
Raises
------
UpdaterError
An instruction has failed
Returns
-------
str
List of bash commands to run
"""
commands = []
instructions_file_path = self._work_dir + "/" + INSTRUCTIONS_FILE
if not isfile(instructions_file_path):
raise UpdaterError(f"cannot find {INSTRUCTIONS_FILE}")
with open(instructions_file_path, "r") as f:
instructions_str = f.read()
try:
instructions = json.loads(instructions_str)
except json.JSONDecodeError:
raise UpdaterError("instructions file was mising or did not contain a valid json")
# valid instructions and make commands
for i in instructions:
i_type = i["type"]
i_items = i["items"]
if i_type not in INSTRUCTIONS:
raise UpdaterError(f"{i_type} is not a valid instruction type")
if not isinstance(i_items, list):
raise UpdaterError(f"{i_type} values is not a list")
if i_type in INSTRUCTIONS_WITH_FILES:
# make sure all file exist
for j in i_items:
if not isfile(self._work_dir + "/" + j):
raise UpdaterError(f"{i_type} is missing file {j}")
i_items_with_paths = [self._work_dir + "/" + i for i in i_items]
command = INSTRUCTIONS[i_type] + " " + " ".join(i_items_with_paths)
else:
command = INSTRUCTIONS[i_type] + " " + " ".join(i_items)
commands.append(command)
return commands
def _run_instructions(self, commands: list):
"""Run the instructions made by `_read_instructions.
Parameters
----------
commands: list
List of bash commands to run
Raises
------
UpdaterError
An instruction has failed
`"""
self._total_instructions = len(commands)
self._instruction_percent = 0
for command in commands:
logger.info(command)
self._command = command
self._instruction_index = commands.index(command)
out = subprocess.run(command, capture_output=True, shell=True, check=False)
if out.returncode != 0:
for line in out.stderr.decode("utf-8").split("\n"):
if len(line) != 0:
logger.error(line)
raise UpdaterError(f"update failed on {command} with {out.returncode}!")
for line in out.stdout.decode("utf-8").split("\n"):
if len(line) != 0:
logger.info(line)
self._instruction_percent = self._instruction_index // self._total_instructions
self._instruction_percent = 100
[docs]
def make_status_archive(self) -> str:
"""Make status tar file with a copy of the dpkg status file, a file with the out put of
``pip freeze`` (the list of python packages), and a file with the list ofupdates in cache.
Returns
-------
str
Path to new status file or empty string on failure.
"""
# make the file names
olu_file = "/tmp/" + new_oresat_file(keyword=OLU_STATUS_KEYWORD)
olu_tar = "/tmp/" + new_oresat_file(keyword=OLU_STATUS_KEYWORD, ext=".tar.xz")
if self._has_dpkg:
dpkg_file = new_oresat_file(keyword=DPKG_STATUS_KEYWORD)
pip_file = "/tmp/" + new_oresat_file(keyword=PIP_STATUS_KEYWORD)
out = subprocess.run("pip freeze", capture_output=True, shell=True, check=False)
if out.returncode != 0:
with open(pip_file, "w") as f:
f.write(out.stdout.decode("utf-8"))
with open(olu_file, "w") as f:
f.write(json.dumps(listdir(self._cache_dir)))
with tarfile.open(olu_tar, "w:xz") as t:
t.add(olu_file, arcname=basename(olu_file))
if isfile(pip_file):
t.add(pip_file, arcname=basename(pip_file))
if self._has_dpkg:
dpkg_status_file = "/var/lib/dpkg/status"
if isfile(dpkg_status_file):
t.add(dpkg_status_file, arcname=basename(dpkg_file))
else:
logger.error(f"could not find {dpkg_status_file}")
remove(olu_file)
return olu_tar
[docs]
def clear_cache(self):
"""Clear the update cache."""
self._cache.clear()
@property
def has_dpkg(self) -> bool:
"""bool: system has dpkg or not"""
return self._has_dpkg
@property
def status(self) -> UpdaterState:
"""UpdaterState: The current state."""
return self._state
@property
def updates_cached(self) -> list:
"""list: The list of update archives in cache."""
return self._cache.files()
@property
def list_updates(self) -> str:
"""str: Get a JSON list of file_name in cache."""
return json.dumps([i.name for i in self._cache.files()])
@property
def update_archive(self) -> str:
"""str: Current update archive while updating."""
return self._update_archive
@property
def total_instructions(self) -> int:
"""int: The total number of instructions in the update running."""
return self._total_instructions
@property
def instruction_index(self) -> int:
"""int: The index of the instruction currently running."""
return self._instruction_index
@property
def instruction_percent(self) -> int:
"""int: The percentage of the instructions completed."""
return self._instruction_percent
@property
def instruction_command(self) -> str:
"""str: The current bash command being running."""
return self._command
def is_update_archive(file_path: str) -> bool:
"""Check to see if the input is a valid update archive.
Parameters
----------
file_path: str
Path to the update archive.
Returns
-------
bool
True the file name is valid or False if it is invalid.
"""
try:
oresat_file = OreSatFile(file_path)
except Exception: # pylint: disable=W0718
return False
if oresat_file.keyword == "update" and oresat_file.extension == ".tar.xz":
return True
return False