Source code for olaf.common.oresat_file_cache

"""File cache class for OreSat files."""

import shutil
from copy import deepcopy
from os import listdir, remove
from os.path import abspath, basename, isfile
from pathlib import Path
from threading import Lock

from .oresat_file import OreSatFile


[docs] class OreSatFileCache: """File cache for OreSat files (by use of :py:class:`OresatFile`). Thread safe.""" def __init__(self, dir_path: str): """ Parameters ---------- dir_path: str Directory to use as a OreSat file cache. Directory will be made if it does not exist. """ self._dir = abspath(dir_path) + "/" self._data = [] self._lock = Lock() if isfile(abspath(self._dir)): raise FileExistsError("Cannot create new directory with an existing file name.") Path(self._dir).mkdir(parents=True, exist_ok=True) for f in listdir(self._dir): try: oresat_file = OreSatFile(self._dir + f) self._data.append(oresat_file) except Exception: # pylint: disable=W0718 remove(self._dir + f) # invalid file name self._data = sorted(self._data) def __len__(self) -> int: with self._lock: length = len(self._data) return length
[docs] def add(self, file_path: str, consume: bool = False): """Add file to cache Parameters ---------- file_path: str Path to the file to add to the cache consume: bool Flag to consume the file when added to the cache Raises ------ FileNotFoundError `file_path` was not a valid file path """ new_file_path = self._dir + basename(file_path) with self._lock: if consume: shutil.move(file_path, new_file_path) else: shutil.copy(file_path, new_file_path) oresat_file = OreSatFile(new_file_path) overwrite = False for i in self._data: if i.name == oresat_file.name: overwrite = True if not overwrite: self._data.append(oresat_file) self._data = sorted(self._data)
[docs] def remove(self, file_name: str): """Remove a file from cache Parameters ---------- file_name: str Name of the file to remove from the cache """ with self._lock: for f in self._data: if f.name == file_name: remove(self._dir + f.name) self._data.remove(f)
[docs] def peek(self) -> str: """Get the oldest file name Returns ------- str Name of the oldest file or an empty string if the cache is empty. """ with self._lock: if len(self._data) > 0: oldest_file = self._data[0].name else: oldest_file = "" return oldest_file
[docs] def pop(self, dir_path: str, copy: bool = False) -> str: """Pop the oldest file from the cache Parameters ---------- dir_path: str Name of the directory to move the file to. copy: bool When True the file is copied from the cache, when False the file is moved out of the cache. Returns ------- str File path of the file now in `dir_path` or an empty string if the cache is empty. """ if dir_path[-1] != "/": dir_path += "/" with self._lock: if len(self._data) > 0: oldest_file = self._data[0] dest = dir_path + oldest_file.name if copy: shutil.copy(self._dir + oldest_file.name, dest) else: shutil.move(self._dir + oldest_file.name, dest) self._data.remove(oldest_file) else: dest = "" return dest
[docs] def get(self, file_name: str, dir_path: str, copy: bool = False) -> str: """Get the file from the cache and move it a specific directory. Parameters ---------- file_name: str Name of the file to get from the cache dir_path: str Name of the directory to move the file to. copy: bool When True the file is copied from the cache, when False the file is moved out of the cache. Raises ------ FileNotFoundError `file_name` was not in cache Returns ------- str File path of the file now in `dir_path` or an empty string if the cache is empty. """ if dir_path[-1] != "/": dir_path += "/" dest = None with self._lock: for f in self._data: if f.name == file_name: dest = dir_path + f.name if copy: shutil.copy(self._dir + f.name, dest) else: shutil.move(self._dir + f.name, dest) self._data.remove(f) if not dest: raise FileNotFoundError(f"file {file_name} not in cache") return dest
[docs] def files(self, keyword: str = "") -> list[OreSatFile]: """Return a list of files in the cache. Parameters ---------- keyword: str A keyword to filter by Returns ------- list list of :py:class:`OreSatFile` that are in the cache. """ files = [] with self._lock: for f in self._data: if keyword and f.keyword != keyword: continue files.append(deepcopy(f)) return files
[docs] def clear(self): """Clear all file in the cache""" with self._lock: shutil.rmtree(self._dir, ignore_errors=True) Path(self._dir).mkdir(parents=True, exist_ok=True) self._data = []
@property def dir(self) -> str: """str: Gets the directory path the cache using.""" return self._dir[:-1] # remove the trailing '/'