Source code for fluidml.storage.file_store

import json
import logging
import os
import pickle
import shutil
from dataclasses import dataclass
from typing import IO, TYPE_CHECKING, Any, AnyStr, Callable, Dict, List, Optional, Tuple

from fluidml.storage.base import Names, Promise, ResultsStore, StoreContext

if TYPE_CHECKING:
    from fluidml.task import TaskInfo

logger = logging.getLogger(__name__)


[docs]class FilePromise(Promise): def __init__( self, name: str, path: str, save_fn: Callable, load_fn: Callable, open_fn: Optional[Callable] = None, mode: Optional[str] = None, load_kwargs: Optional[Dict] = None, **open_kwargs, ): super().__init__() self.name = name self.path = path self.save_fn = save_fn self.load_fn = load_fn self.open_fn = open_fn self.mode = mode self.load_kwargs = load_kwargs if load_kwargs is not None else {} self.open_kwargs = open_kwargs
[docs] def load(self, **kwargs): kwargs = {**self.load_kwargs, **kwargs} try: if self.mode is None: return self.load_fn(self.path, **kwargs) else: with File( self.path, self.mode, load_fn=self.load_fn, save_fn=self.save_fn, open_fn=self.open_fn, **self.open_kwargs, ) as file: return file.load(**kwargs) except FileNotFoundError: logger.warning(f'"{self.name}" could not be found in store.') return None except IsADirectoryError: return self.load_fn(self.path, **kwargs)
[docs]class File: """A file like wrapper class to support opening files using the ``LocalFileStore``. Args: path: The path to the file to open. mode: The open mode, e.g. "r", "w", etc. save_fn: A callable used for ``file.save()`` calls. load_fn: A callable used for ``file.load()`` calls. open_fn: An optional callable used for file opening. The default is the inbuilt ``open()`` function. load_kwargs: Additional keyword arguments passed to the open function. """ def __init__( self, path: str, mode: str, save_fn: Callable, load_fn: Callable, open_fn: Optional[Callable] = None, load_kwargs: Optional[Dict] = None, **open_kwargs, ): self._path = path self._mode = mode self._save_fn = save_fn self._load_fn = load_fn self._open_kwargs = open_kwargs self._load_kwargs = load_kwargs if load_kwargs is not None else {} if open_fn is None: self.f = open(self._path, self._mode, **open_kwargs) else: self.f = open_fn(self._path, self._mode, **open_kwargs) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __iter__(self): return self.f.__iter__() def __next__(self): return self.f.__next__()
[docs] def close(self): self.f.close()
@property def closed(self): return self.f.closed
[docs] def flush(self): self.f.flush()
[docs] def readline(self, size: int = -1): return self.f.readline(size)
[docs] def readlines(self, hint: int = -1): return self.f.readlines(hint)
[docs] def readable(self): return self.f.readable()
[docs] def writable(self): return self.f.writable()
[docs] def seekable(self): return self.f.seekable()
[docs] def seek(self, offset: int, whence=0): return self.f.seek(offset, whence)
[docs] def tell(self): return self.f.tell()
[docs] def truncate(self, size: Optional[int] = None): return self.f.truncate(size)
[docs] def writelines(self, lines: List): return self.f.writelines(lines)
[docs] def read(self, size: int = -1): self.f.read(size)
[docs] def write(self, obj: AnyStr): self.f.write(obj)
[docs] def load(self, **kwargs): kwargs = {**self._load_kwargs, **kwargs} return self._load_fn(self.f, **kwargs)
[docs] def save(self, obj, **kwargs): self._save_fn(obj, self.f, **kwargs)
[docs] @classmethod def from_promise(cls, promise: FilePromise): """Creates a ``File`` object from a `´FilePromise``.""" return cls( promise.path, promise.mode, promise.save_fn, promise.load_fn, promise.open_fn, **promise.open_kwargs, )
[docs]@dataclass class TypeInfo: """Initializes saving and loading information for a specific type.""" save_fn: Callable """Save function used to save the object to store.""" load_fn: Callable """Load function used to load the object from store.""" extension: Optional[str] = None """File extension the object is saved with.""" is_binary: Optional[bool] = None """Read, write and append in binary mode.""" open_fn: Optional[Callable] = None """Function used to open a file object (default is builtin open()).""" needs_path: bool = False """Whether save and load fn should operate on path and not on file like object. Default: false."""
[docs]class LocalFileStore(ResultsStore): """A local file store that allows to easily save and load task results to/from a base directory in a file system. Out of the box the local file store supports three common file types, "json", "pickle" and "text". It can be easily extended to arbitrary file types by subclassing the LocalFileStore and registering new Types to the `self._type_registry` dictionary. A new type needs to register a load and save function using the ``TypeInfo`` data class. Args: base_dir: The base directory that is used to store results from tasks. Attributes: base_dir: The base directory that is used to store results from tasks. type_registry: The dictionary to register new types with a save and load function. """ def __init__(self, base_dir: str): super().__init__() self.base_dir = base_dir self.type_registry: Dict[str, TypeInfo] = { "event": TypeInfo(self._write, self._read), "json": TypeInfo(json.dump, json.load, "json"), "pickle": TypeInfo(pickle.dump, pickle.load, "p", is_binary=True), "text": TypeInfo(self._write, self._read, "txt"), } # can be set externally. if set, it is used for naming newly created directories self._run_info: Optional["TaskInfo"] = None @property def run_info(self): """The current run info of a task, which is used for naming newly created directories.""" return self._run_info @run_info.setter def run_info(self, run_info: "TaskInfo"): self._run_info = run_info @staticmethod def _write(obj: str, file: IO): file.write(obj) @staticmethod def _read(file: IO) -> str: return file.read() @staticmethod def _get_obj_dir(run_dir: str, sub_dir: Optional[str] = None) -> Optional[str]: obj_dir = run_dir if sub_dir is not None: obj_dir = os.path.join(run_dir, sub_dir) os.makedirs(obj_dir, exist_ok=True) return obj_dir @staticmethod def _save_load_info( name: str, run_dir: str, obj_dir: str, type_: str, open_kwargs: Dict[str, Any], load_kwargs: Dict[str, Any], ): load_info = { "open_kwargs": open_kwargs, "load_kwargs": load_kwargs, "obj_dir": os.path.relpath(obj_dir, run_dir), "type_": type_, } # create a hidden load info dir to save object information used by self.load() load_info_dir = os.path.join(run_dir, Names.FLUIDML_DIR) os.makedirs(load_info_dir, exist_ok=True) pickle.dump( load_info, open(os.path.join(load_info_dir, f'.{name.lstrip(".")}_load_info.p'), "wb"), ) @staticmethod def _get_load_info(run_dir: str, name: str) -> Optional[Tuple]: # get load information from run dir load_info_file_path = os.path.join(run_dir, Names.FLUIDML_DIR, f'.{name.lstrip(".")}_load_info.p') try: load_info = pickle.load(open(load_info_file_path, "rb")) except FileNotFoundError: load_info = None # Fallback to old load info dir name ".load_info" -> for backward compatibility with old results if load_info is None: load_info_old_file_path = os.path.join(run_dir, ".load_info", f'.{name.lstrip(".")}_load_info.p') try: load_info = pickle.load(open(load_info_old_file_path, "rb")) except FileNotFoundError: logger.warning(f'"{name}" could not be found in store, since "{load_info_file_path}" does not exist.') return None # unpack load info type_ = load_info["type_"] obj_dir = os.path.join(run_dir, load_info["obj_dir"]) open_kwargs = load_info["open_kwargs"] load_kwargs = load_info["load_kwargs"] return type_, obj_dir, open_kwargs, load_kwargs, load_info_file_path @staticmethod def _create_promise(type_info: TypeInfo, name: str, path: str, open_kwargs: Dict, load_kwargs: Dict) -> FilePromise: # load the saved object from run dir mode = "rb" if type_info.is_binary else "r" if type_info.needs_path: mode = None return FilePromise( name, path, type_info.save_fn, type_info.load_fn, type_info.open_fn, mode, load_kwargs, **open_kwargs, )
[docs] def save( self, obj: Any, name: str, type_: str, task_name: str, task_unique_config: Dict, sub_dir: Optional[str] = None, mode: Optional[str] = None, open_kwargs: Optional[Dict[str, Any]] = None, load_kwargs: Optional[Dict[str, Any]] = None, **kwargs, ): """Saves an object to the local file store. If no task and run directory for the provided unique config exists, a new directory will be created. Args: obj: The object to save. name: An unique name given to this object. type\_: Additional type specification (e.g. json, which is to be passed to results store). task_name: Task name which saves the object. task_unique_config: Unique config which specifies the run of the object. sub_dir: A path of a subdirectory used for saving the file. mode: The mode to save the file, e.g. "w" or "wb". open_kwargs: Additional keyword arguments passed to the registered ``open()`` function. load_kwargs: Additional keyword arguments passed to the registered ``load()`` function. **kwargs: Additional keyword arguments passed to the registered ``save()`` function. """ open_kwargs = {} if open_kwargs is None else open_kwargs load_kwargs = {} if load_kwargs is None else load_kwargs run_dir = self.get_context(task_name, task_unique_config).run_dir # get save function and file extension for type try: type_info = self.type_registry[type_] except KeyError: raise KeyError( f'Object type "{type_}" is not supported in {self.__class__.__name__}. Either extend it by ' f"implementing specific load and save functions for this type, or save the object as one " f'of the following supported types: {", ".join(self.type_registry)}.' ) # set and return save directory for object obj_dir = self._get_obj_dir(run_dir, sub_dir) # save load info self._save_load_info(name, run_dir, obj_dir, type_, open_kwargs, load_kwargs) # save object name = f"{name}.{type_info.extension}" if type_info.extension else name path = os.path.join(obj_dir, name) if not mode: mode = "wb" if type_info.is_binary else "w" if type_info.needs_path: type_info.save_fn(obj, path, **kwargs) else: with File( path, mode, save_fn=type_info.save_fn, load_fn=type_info.load_fn, open_fn=type_info.open_fn, **open_kwargs, ) as file: file.save(obj=obj, **kwargs)
[docs] def load( self, name: str, task_name: str, task_unique_config: Dict, lazy: bool = False, **kwargs, ) -> Optional[Any]: """Loads an object from the local file store. The object is retrieved based on the name and the provided task name and unique task config. Args: name: An unique name given to this object. task_name: Task name which saves the object. task_unique_config: Unique config which specifies the run of the object. lazy: A boolean whether the object should be lazily loaded. If True, a ``FilePromise`` object will be returned, that can be loaded into memory on demand with the ``promise.load()`` function. **kwargs: Additional keyword arguments passed to the registered ``load()`` function. Returns: The specified object if it is found. """ task_dir = os.path.join(self.base_dir, task_name) # try to get existing run dir with self.lock: run_dir = self._get_run_dir(task_dir=task_dir, task_config=task_unique_config) if run_dir is None: return None # get load information from run dir load_info = self._get_load_info(run_dir, name) if not load_info: return None type_, obj_dir, open_kwargs, load_kwargs, load_info_file_path = load_info # merge saved load kwargs with user provided kwargs # user provided kwargs overwrite saved kwargs when keys are identical load_kwargs = {**load_kwargs, **kwargs} # get type info used for file loading type_info = self.type_registry[type_] # get path full_name = f"{name}.{type_info.extension}" if type_info.extension else name path = os.path.join(obj_dir, full_name) # if path does not exist return None if not os.path.exists(path): return None if lazy: return self._create_promise(type_info, name, path, open_kwargs, load_kwargs) # load the saved object from run dir if type_info.needs_path: obj = type_info.load_fn(path, **load_kwargs) else: mode = "rb" if type_info.is_binary else "r" try: with File( path, mode, save_fn=type_info.save_fn, load_fn=type_info.load_fn, open_fn=type_info.open_fn, **open_kwargs, ) as file: obj = file.load(**load_kwargs) except FileNotFoundError: logger.warning(f'Task "{task_name}" could not find "{name}" in results store. ') return None return obj
[docs] def delete(self, name: str, task_name: str, task_unique_config: Dict): """Deletes an object from the local file store. The object is deleted based on the name and the provided task name and unique task config. Args: name: The name of the to be deleted object. task_name: Task name which saved the object. task_unique_config: Unique config which specifies the run of the object. """ task_dir = os.path.join(self.base_dir, task_name) # try to get existing run dir with self.lock: run_dir = self._get_run_dir(task_dir=task_dir, task_config=task_unique_config) if run_dir is None: logger.warning( f'"{name}" could not be deleted. ' f'No run directory for task "{task_name}" and the provided unique_config exists.' ) return None # get load information from run dir load_info = self._get_load_info(run_dir, name) if not load_info: return None type_, obj_dir, _, _, load_info_file_path = load_info # get type info used for file loading type_info = self.type_registry[type_] name = f"{name}.{type_info.extension}" if type_info.extension else name path_to_delete = os.path.join(obj_dir, name) # remove the saved object/directory try: os.remove(path_to_delete) except FileNotFoundError: logger.warning(f'"{path_to_delete}" could not be deleted from store since it was not found.') except IsADirectoryError: shutil.rmtree(path_to_delete) # remove its load info file from the store os.remove(load_info_file_path)
[docs] def delete_run(self, task_name: str, task_unique_config: Dict): """Deletes an entire task run directory from the local file store. The run is deleted based on the task name and the unique task config. Args: task_name: The name of the task. task_unique_config: Unique config which specifies the run of the object. """ task_dir = os.path.join(self.base_dir, task_name) # try to get existing run dir with self.lock: run_dir = self._get_run_dir(task_dir=task_dir, task_config=task_unique_config) if run_dir is None: logger.warning(f'No run directory for task "{task_name}" and the provided unique_config exists.') return None # delete retrieved run dir shutil.rmtree(run_dir)
[docs] def open( self, name: Optional[str] = None, task_name: Optional[str] = None, task_unique_config: Optional[Dict] = None, mode: Optional[str] = None, promise: Optional[FilePromise] = None, type_: Optional[str] = None, sub_dir: Optional[str] = None, **open_kwargs, ) -> Optional[File]: """Wrapper to open a file from Local File Store (only available for Local File Store). It returns a file like object that has additional ``save()`` and ``load()`` methods that can be used to save/load objects with a registered type to/from the file store. The ``File`` like object allows for an incremental write or read process of objects that for example don't fit into memory. Args: name: An unique name given to this object. task_name: Task name which saved the object. task_unique_config: Unique config which specifies the run of the object. mode: The open mode, e.g. "r", "w", etc. promise: An optional ``Promise`` object used for creating the returned file like object. type\_: Additional type specification (e.g. json, which is to be passed to results store). sub_dir: A path of a subdirectory used for opening the file. **open_kwargs: Additional keyword arguments passed to the registered ``open()`` function. Returns: A ``File`` object that wraps a file like object and enables incremental result store reading and writing. """ # The available file pointer methods per open mode are: # | r r+ w w+ a a+ x x+ # ------------------|----------------------------------- # read | + + + + + # write | + + + + + + + # write after seek | + + + + + # create | + + + + + + # truncate | + + # position at start | + + + + + + # position at end | + + assert promise is not None or all(arg is not None for arg in (name, task_name, task_unique_config, mode)) if promise: if mode: promise.mode = mode return File.from_promise(promise) load_kwargs = {} if "r" in mode: task_dir = os.path.join(self.base_dir, task_name) # try to get existing run dir with self.lock: run_dir = self._get_run_dir(task_dir=task_dir, task_config=task_unique_config) if run_dir is None: raise FileNotFoundError # get load information from run dir load_info = self._get_load_info(run_dir, name) if not load_info: raise FileNotFoundError type_, obj_dir, open_kwargs, load_kwargs, load_info_file_path = load_info # get type info used for file loading type_info = self.type_registry[type_] elif "a" in mode: # try to get existing run dir run_dir = self.get_context(task_name, task_unique_config).run_dir # get load information from run dir load_info = self._get_load_info(run_dir, name) if load_info: ( type_, obj_dir, open_kwargs, load_kwargs, load_info_file_path, ) = load_info # get type info used for file loading type_info = self.type_registry[type_] else: # get type_info try: type_info = self.type_registry[type_] except KeyError: raise KeyError( f'Object type "{type_}" is not supported in {self.__class__.__name__}. Either extend it by ' f"implementing specific load and save functions for this type, or save the object as one " f'of the following supported types: {", ".join(self.type_registry)}.' ) # set and return save directory for object obj_dir = self._get_obj_dir(run_dir, sub_dir) # save load info self._save_load_info(name, run_dir, obj_dir, type_, open_kwargs, load_kwargs) else: run_dir = self.get_context(task_name, task_unique_config).run_dir # get type_info try: type_info = self.type_registry[type_] except KeyError: raise KeyError( f'Object type "{type_}" is not supported in {self.__class__.__name__}. Either extend it by ' f"implementing specific load and save functions for this type, or save the object as one " f'of the following supported types: {", ".join(self.type_registry)}.' ) # set and return save directory for object obj_dir = self._get_obj_dir(run_dir, sub_dir) # save load info self._save_load_info(name, run_dir, obj_dir, type_, open_kwargs, load_kwargs) # return file object name = f"{name}.{type_info.extension}" if type_info.extension else name path = os.path.join(obj_dir, name) return File( path, mode, load_fn=type_info.load_fn, save_fn=type_info.save_fn, open_fn=type_info.open_fn, load_kwargs=load_kwargs, **open_kwargs, )
[docs] def get_context(self, task_name: str, task_unique_config: Dict) -> StoreContext: """Method to get the current task's storage context. E.g. the current run directory in case of LocalFileStore. Creates a new run dir if none exists. Args: task_name: Task name. task_unique_config: Unique config which specifies the run. """ task_dir = os.path.join(self.base_dir, task_name) with self.lock: # try to get existing run dir run_dir = LocalFileStore._get_run_dir(task_dir=task_dir, task_config=task_unique_config) # create new run dir if run dir does not exist if run_dir is None: run_dir = LocalFileStore._make_run_dir(task_dir=task_dir, run_info=self.run_info) json.dump( task_unique_config, open(os.path.join(run_dir, Names.CONFIG), "w"), indent=4, ) sweep_counter = os.path.split(run_dir)[-1].rsplit("-")[-1] return StoreContext(run_dir=run_dir, sweep_counter=sweep_counter)
@staticmethod def _scan_task_dir(task_dir: str) -> List[str]: if not os.path.isdir(task_dir): return [] exist_run_dirs = [os.path.join(task_dir, d.name) for d in os.scandir(task_dir) if d.is_dir()] return exist_run_dirs @staticmethod def _get_run_dir(task_dir: str, task_config: Dict) -> Optional[str]: exist_run_dirs = LocalFileStore._scan_task_dir(task_dir=task_dir) for exist_run_dir in exist_run_dirs: try: exist_config = json.load(open(os.path.join(exist_run_dir, Names.CONFIG), "r")) except FileNotFoundError: continue if exist_config.items() <= task_config.items(): return exist_run_dir return None @staticmethod def _make_run_dir(task_dir: str, run_info: Optional["TaskInfo"] = None) -> str: # if run info exists and holds sweep_counter attribute, we create the previously existent run dir name if run_info and run_info.sweep_counter: new_dir_name = f"{run_info.run_name}-{run_info.sweep_counter}" else: # retrieve all existing dir names for task exist_run_dirs = LocalFileStore._scan_task_dir(task_dir=task_dir) dir_names = [os.path.split(d)[-1] for d in exist_run_dirs] # if run info exists we use the assigned run_name to filter the relevant run dir names if run_info: run_name = run_info.run_name # find dirs that start with run_name and extract their suffix (usually a numeric counter) dir_names = [name.rsplit("-", 1)[-1] for name in dir_names if name.startswith(run_name)] # get all numeric dir names in task dir and convert to ids ids = [int(d_name) for d_name in dir_names if d_name.isdigit()] # increment max id by 1 or start at 0 new_id = max(ids) + 1 if ids else 0 new_id = str(new_id).zfill(3) # create the new run dir name new_dir_name = new_id if run_info is None else f"{run_info.run_name}-{new_id}" # create and return new run dir new_run_dir = os.path.join(task_dir, new_dir_name) os.makedirs(new_run_dir, exist_ok=True) return new_run_dir