Source code for fluidml.task

import datetime
import inspect
import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

from metadict import MetaDict

from fluidml.dependency import DependencyMixin
from fluidml.storage.base import Names, Promise, ResultsStore, StoreContext
from fluidml.storage.file_store import File
from fluidml.utils import BaseModel, change_logging_level

if TYPE_CHECKING:
    from fluidml.task_spec import TaskSpec

logger = logging.getLogger(__name__)


class TaskResults(BaseModel):
    """Holds the results and unique config of a task."""

    task_name: str
    """The name of the task, e.g. "training"."""
    task_unique_name: str
    """The unique name of the task, e.g. "training-3", in the context of the current run."""
    unique_config: Dict
    """The unique config specifying the task."""
    results: Optional[Dict[str, Any]] = None
    """The results dictionary. Each key-value pair represents a result name and value."""


[docs]class TaskState(str, Enum): """The state of a task.""" PENDING = "pending" """Task has not been scheduled and processed, yet.""" SCHEDULED = "scheduled" """Task has been scheduled for processing.""" RUNNING = "running" """Task is currently running.""" KILLED = "killed" """Task has been killed by the user via KeyBoardInterrupt.""" FAILED = "failed" """Task failed due to an unexpected error.""" UPSTREAM_FAILED = "upstream_failed" """Task failed/could not be executed due to one or more upstream task failures.""" FINISHED = "finished" """Task finished successfully."""
[docs]class TaskInfo(BaseModel): """A wrapper class to store important task information. Used for serializing the information to the results store. """ project_name: str """Name of the project.""" run_name: str """Name of the task's current run.""" state: Optional[TaskState] = None """The current state of the task, e.g. "running", "finished", etc.""" started: Optional[datetime.datetime] = None """The start time of the task.""" ended: Optional[datetime.datetime] = None """The end time of the task.""" duration: Optional[datetime.timedelta] = None """The current running time of the task.""" run_history: Optional[Dict] = None """Holds the task ids of all predecessor task including the task itself.""" sweep_counter: Optional[str] = None """A dynamically created counter to distinguish different task instances with the same run name in the results store. E.g. used in the LocalFileStore to name run directories. """ unique_config_hash: Optional[str] = None """An 8 character hash of the unique run config.""" id: Optional[str] = None """A combination of the run-name and the sweep counter or unique config hash if the former does not exist."""
[docs]class Task(ABC, DependencyMixin): """The base ``Task`` that provides the user with additional features such as result store and run name access. When implementing a task as a Class Args: name: Name of the task, e.g. "Processing". unique_name: Unique name of the task if a run contains multiple instances of the same task, e.g. "Processing-3". project_name: Name of the project. run_name: Name of the task's current run. state: The current state of the task, e.g. "running", "finished", etc. started: The start time of the task. ended: The end time of the task. run_history: Holds the task ids of all predecessor task including the task itself. sweep_counter: A dynamically created counter to distinguish different task instances with the same run name in the results store. E.g. used in the LocalFileStore to name run directories. unique_config_hash: An 8 character hash of the unique run config. unique_config: Unique config of the task. Includes all predecessor task configs as well to uniquely define a task. results_store: An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used. resource: A resource object that can hold arbitrary data, e.g. gpu or cpu device information. Resource objects can be assigned in a multiprocessing context, so that each worker process uses a dedicated resource, e.g. cuda device. force: Indicator if the task is force executed or not. expects: A dict of expected input arguments and their :func:`inspect.Parameter` objects of the task's run method signature. reduce: A boolean indicating whether this is a reduce-task. Defaults to ``None``. config: The config dictionary of the task. Only contains the config parameters of the task (no predecessors). """ # needed to avoid overwriting already initialized Task attributes, when the user task's __init__ method # with a super() call is called at a later time. _is_initialized: bool = False def __init__( self, name: Optional[str] = None, unique_name: Optional[str] = None, project_name: Optional[str] = None, run_name: Optional[str] = None, state: Optional[TaskState] = None, started: Optional[datetime.datetime] = None, ended: Optional[datetime.datetime] = None, run_history: Optional[Dict] = None, sweep_counter: Optional[str] = None, unique_config_hash: Optional[str] = None, unique_config: Optional[Union[Dict, MetaDict]] = None, results_store: Optional[ResultsStore] = None, resource: Optional[Any] = None, force: Optional[bool] = None, expects: Optional[Dict[str, inspect.Parameter]] = None, reduce: Optional[bool] = None, config: Optional[Dict[str, Any]] = None, ): if not self._is_initialized: DependencyMixin.__init__(self) self.name = name self.unique_name = unique_name self.project_name = project_name self.run_name = run_name self.state = state self.started = started self.ended = ended self.run_history = run_history self.sweep_counter = sweep_counter self.unique_config_hash = unique_config_hash self.unique_config = unique_config self.results_store = results_store self.resource = resource self.force = force self.expects = expects self.reduce = reduce self.config = config self._is_initialized = True @property def id(self) -> str: """A combination of the run-name and the sweep counter or unique config hash if the former does not exist.""" return ( f"{self.run_name}-{self.sweep_counter}" if self.sweep_counter else f"{self.run_name}-{self.unique_config_hash}" ) @property def duration(self) -> Optional[datetime.timedelta]: """The current running time of the task.""" if self.started and self.ended: return self.ended - self.started elif self.started: return datetime.datetime.now() - self.started else: return None @property def info(self) -> TaskInfo: """A ``TaskInfo`` object that summarizes the important task information for serialization.""" return TaskInfo( project_name=self.project_name, run_name=self.run_name, state=self.state, started=self.started, ended=self.ended, duration=self.duration, run_history=self.run_history, sweep_counter=self.sweep_counter, unique_config_hash=self.unique_config_hash, id=self.id, )
[docs] @abstractmethod def run(self, **results): """Implementation of core logic of task. Args: **results: results from predecessors (automatically passed) """ raise NotImplementedError
def _init(self): """A wrapper to call the actual user task's __init__ method with the provided config""" self.__init__(**self.config) @staticmethod def _check_no_internally_used_config_keys(task: "Task", config: MetaDict): internal_keys = [k for k in config if k in {**Task.__dict__, **task.__dict__}] if internal_keys: raise ValueError( f"The config keys: {', '.join(internal_keys)} are protected since they are used " f"internally by the Task class. Please consider renaming them." ) def _track_saved_object(self, name: str, mode: str, type_: Optional[str] = None): # load saved objects with change_logging_level(40): saved_objects: Optional[List[str]] = self.load( name=Names.SAVED_RESULTS_FILE, task_name=self.name, task_unique_config=self.unique_config, ) if mode == "save": if saved_objects is None: saved_objects = [] if name not in saved_objects: saved_objects.append(name) self.results_store.save( obj=saved_objects, name=Names.SAVED_RESULTS_FILE, type_="json", sub_dir=Names.FLUIDML_DIR, task_name=self.name, task_unique_config=self.unique_config, ) elif mode == "delete": # delete name from registry and save registry if saved_objects is not None and name in saved_objects: del saved_objects[saved_objects.index(name)] self.results_store.save( obj=saved_objects, name=Names.SAVED_RESULTS_FILE, type_="json", sub_dir=Names.FLUIDML_DIR, task_name=self.name, task_unique_config=self.unique_config, ) else: raise ValueError(f'"mode" argument is "{mode}" but must be "save" or "delete".') debug_msg = f'Task "{self.unique_name}" {mode}s "{name}"' msg = debug_msg + f"." if type_ is None else debug_msg + f' of type "{type_}".' logger.debug(msg)
[docs] def save(self, obj: Any, name: str, type_: Optional[str] = None, **kwargs): """Saves the given object to the results store. Args: obj: Any object that is to be saved name: A unique name given to this object type\_: Additional type specification (e.g. json, which is to be passed to results store). Defaults to ``None``. **kwargs: Additional keyword args. """ self.results_store.save( obj=obj, name=name, type_=type_, task_name=self.name, task_unique_config=self.unique_config, **kwargs, ) self._track_saved_object(name, mode="save", type_=type_)
[docs] def load( self, name: str, task_name: Optional[str] = None, task_unique_config: Optional[Union[Dict, MetaDict]] = None, **kwargs, ) -> Any: """Loads the given object from results store. Args: name: A unique name given to this object. task_name: Task name which saved the loaded object. task_unique_config: Unique config which specifies the run of the loaded object. **kwargs: Additional keyword args. Returns: The loaded object. """ task_name = task_name if task_name is not None else self.name task_unique_config = task_unique_config if task_unique_config is not None else self.unique_config return self.results_store.load( name=name, task_name=task_name, task_unique_config=task_unique_config, **kwargs, )
[docs] def delete( self, name: str, task_name: Optional[str] = None, task_unique_config: Optional[Union[Dict, MetaDict]] = None, ): """Deletes object with specified name from results store Args: name: A unique name given to this object. task_name: Task name which saved the object. task_unique_config: Unique config which specifies the run of the object. """ task_name = task_name if task_name is not None else self.name task_unique_config = task_unique_config if task_unique_config is not None else self.unique_config self.results_store.delete(name=name, task_name=task_name, task_unique_config=task_unique_config) self._track_saved_object(name, mode="delete")
[docs] def delete_run( self, task_name: Optional[str] = None, task_unique_config: Optional[Union[Dict, MetaDict]] = None, ): """Deletes run with specified name from results store Args: task_name: Task name which saved the object. task_unique_config: Unique config which specifies the run of the object. """ task_name = task_name if task_name is not None else self.name task_unique_config = task_unique_config if task_unique_config is not None else self.unique_config self.results_store.delete_run(task_name=task_name, task_unique_config=task_unique_config)
[docs] def get_store_context( self, task_name: Optional[str] = None, task_unique_config: Optional[Union[Dict, MetaDict]] = None, ) -> StoreContext: """Wrapper to get store specific storage context, e.g. the current run directory for Local File Store Args: task_name: Task name. task_unique_config: Unique config which specifies the run. Returns: The store context object holding information like the current run dir. """ task_name = task_name if task_name is not None else self.name task_unique_config = task_unique_config if task_unique_config is not None else self.unique_config return self.results_store.get_context(task_name=task_name, task_unique_config=task_unique_config)
[docs] def open( self, name: Optional[str] = None, task_name: Optional[str] = None, task_unique_config: Optional[Union[Dict, MetaDict]] = None, mode: Optional[str] = None, promise: Optional[Promise] = None, type_: Optional[str] = None, sub_dir: Optional[str] = None, **open_kwargs, ) -> Optional[File]: """Wrapper to open a file from Local File Store (only available for Local File Store). Args: name: An unique name given to this object. task_name: Task name which saved the object. task_unique_config: Unique config which specifies the run of the object. mode: The open mode, e.g. "r", "w", etc. promise: An optional ``Promise`` object used for creating the returned file like object. type\_: Additional type specification (e.g. json, which is to be passed to results store). sub_dir: A path of a subdirectory used for opening the file. **open_kwargs: Additional keyword arguments passed to the registered ``open()`` function. Returns: A ``File`` object that behaves like a file like object. """ if promise: return self.results_store.open(promise=promise, mode=mode) task_name = task_name if task_name is not None else self.name task_unique_config = task_unique_config if task_unique_config is not None else self.unique_config if mode is not None and ("w" in mode or "a" in mode): self._track_saved_object(name, mode="save", type_=type_) return self.results_store.open( name=name, task_name=task_name, task_unique_config=task_unique_config, mode=mode, type_=type_, sub_dir=sub_dir, **open_kwargs, )
[docs] @classmethod def from_spec(cls, task_spec: "TaskSpec", half_initialize: bool = False) -> "Task": """Initializes a Task object from a TaskSpec object. Args: task_spec: A task specification object. half_initialize: A boolean to indicate whether only the parent Task object is initialized and the child class initialization is delayed until ``task._init()`` is called. The half initialization is only needed internally to create a task object without directly executing the ``__init__`` method of the actual task implemented by the user. Returns: A task object (fully or half initialized). """ # convert task config values to MetaDict task_spec.config = MetaDict(task_spec.config) if inspect.isclass(task_spec.task): if half_initialize: # create a new user task object without initialization task = task_spec.task.__new__(task_spec.task) # only init the inherited base task Task.__init__(task) else: # normal initialization task = task_spec.task(**task_spec.config) task.config = task_spec.config # make sure the user provided config does not contain first level keys that are used in the Task class Task._check_no_internally_used_config_keys(task=task, config=task.config) elif inspect.isfunction(task_spec.task): # create an artificial wrapper task object to support functional tasks task = _TaskFromCallable( task=task_spec.task, config=task_spec.config, ) else: # cannot be reached, check has been made in TaskSpec. raise TypeError task.name = task_spec.name task.unique_name = task_spec.unique_name task.unique_config = task_spec.unique_config task.reduce = task_spec.reduce task.expects = task_spec.expects task.predecessors = task_spec.predecessors task.successors = task_spec.successors return task
class _TaskFromCallable(Task): """A wrapper class that wraps a callable as a Task.""" def __init__(self, task: Callable, config: MetaDict): super().__init__() self.task = task self.config = config def run(self, **results: Dict[str, Any]): self.task(**results, **self.config, task=self) def _init(self): pass