fluidml

class fluidml.Flow(tasks, config_ignore_prefix=None, config_group_prefix=None)[source]

Flow implements the core logic of building and expanding task graphs from task specifications.

  • It automatically expands the tasks based on task specifications and task configs.

  • It extends the registered dependencies to the expanded tasks.

  • It provides the task graph objects for introspection and visualization.

  • Finally, it executes the task graph using multiprocessing if necessary.

Parameters:
  • tasks (List[TaskSpec]) – List of task specifications.

  • config_ignore_prefix (Optional[str]) – A config key prefix, e.g. “_”. Prefixed keys will be not included in the “unique_config”, which is used to determine whether a run has been executed or not.

  • config_group_prefix (Optional[str]) – A config grouping prefix, to indicate that to parameters are grouped and expanded using the “zip” method. The grouping prefix enables the “zip” expansion of specific parameters, while all remaining grid parameters are expanded via “product”. Example: cfg = {"a": [1, 2, "@x"], "b": [1, 2, 3], "c": [1, 2, "@x"] Without grouping “product” expansion would yield: 2 * 2 * 3 = 12 configs. With grouping “product” expansion yields : 2 * 3 = 6 configs, since the grouped parameters are “zip” expanded.

property num_tasks: int

The total number of expanded tasks in the task graph.

run(num_workers=None, resources=None, start_method='spawn', exit_on_error=True, log_to_tmux=False, max_panes_per_window=4, force=None, project_name='uncategorized', run_name=None, results_store=None, return_results=None)[source]

Runs the expanded task graph sequentially or in parallel using multiprocessing and returns the results.

Parameters:
  • num_workers (Optional[int]) – Number of parallel processes (dolphins) used. Internally, the optimal number of processes optimal_num_workers is inferred from the task graph. If num_workers is None or num_workers > optimal_num_workers, num_workers is overwritten to optimal_num_workers. Defaults to None.

  • resources (Optional[Union[Any, List[Any]]]) – A single resource object or a list of resources that are assigned to workers. Resources can hold arbitrary data, e.g. gpu or cpu device information, making sure that each worker has access to a dedicated device If num_workers > 1 and len(resources) == num_workers resources are assigned to each worker. If len(resources) < num_workers resources are assigned randomly to workers. If num_workers == 1 the first resource resources[0] is assigned to all tasks (not workers). Defaults to None.

  • start_method (str) – Start method for multiprocessing. Defaults to "spawn". Only used when num_workers > 1.

  • exit_on_error (bool) – When an error happens all workers finish their current tasks and exit gracefully. Defaults to True. Only used when num_workers > 1.

  • log_to_tmux (bool) – If True a new tmux session is created (given tmux is installed) and each worker (process) logs to a dedicated pane to avoid garbled logging output. The session’s name equals the combined f"{project_name}--{run-name}". Defaults to False. Only used when num_workers > 1.

  • max_panes_per_window (int) – Max number of panes per tmux window. Requires log_to_tmux being set to True. Defaults to 4. Only used when num_workers > 1.

  • force (Optional[Union[str, List[str]]]) – Forcefully re-run tasks. Possible options are: 1) "all" - All the tasks are re-run. 2) A task name (e.g. “PreProcessTask”) or list of task names (e.g. ["PreProcessTask1", "PreProcessTask2"]). Additionally, each task name can have the suffix “+” to re-run also its successors (e.g. “PreProcessTask+”).

  • project_name (str) – Name of project. Defaults to "uncategorized".

  • run_name (Optional[str]) – Name of run.

  • results_store (Optional[ResultsStore]) – An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used.

  • return_results (Optional[str]) – Return results-dictionary after run(). Defaults to all. Choices: "all", "latest", None.

Returns:

A results dictionary with the following schema.

{"task_name_1": List[TaskResults], "task_name_2": List[TaskResults]}

Return type:

Optional[Dict[str, List[TaskResults]]]

class fluidml.Task(name=None, unique_name=None, project_name=None, run_name=None, state=None, started=None, ended=None, run_history=None, sweep_counter=None, unique_config_hash=None, unique_config=None, results_store=None, resource=None, force=None, expects=None, reduce=None, config=None)[source]

The base Task that provides the user with additional features such as result store and run name access.

When implementing a task as a Class

Parameters:
  • name (Optional[str]) – Name of the task, e.g. “Processing”.

  • unique_name (Optional[str]) – Unique name of the task if a run contains multiple instances of the same task, e.g. “Processing-3”.

  • project_name (Optional[str]) – Name of the project.

  • run_name (Optional[str]) – Name of the task’s current run.

  • state (Optional[TaskState]) – The current state of the task, e.g. “running”, “finished”, etc.

  • started (Optional[datetime]) – The start time of the task.

  • ended (Optional[datetime]) – The end time of the task.

  • run_history (Optional[Dict]) – Holds the task ids of all predecessor task including the task itself.

  • sweep_counter (Optional[str]) – A dynamically created counter to distinguish different task instances with the same run name in the results store. E.g. used in the LocalFileStore to name run directories.

  • unique_config_hash (Optional[str]) – An 8 character hash of the unique run config.

  • unique_config (Optional[Union[Dict, MetaDict]]) – Unique config of the task. Includes all predecessor task configs as well to uniquely define a task.

  • results_store (Optional[ResultsStore]) – An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used.

  • resource (Optional[Any]) – A resource object that can hold arbitrary data, e.g. gpu or cpu device information. Resource objects can be assigned in a multiprocessing context, so that each worker process uses a dedicated resource, e.g. cuda device.

  • force (Optional[bool]) – Indicator if the task is force executed or not.

  • expects (Optional[Dict[str, Parameter]]) – A dict of expected input arguments and their inspect.Parameter() objects of the task’s run method signature.

  • reduce (Optional[bool]) – A boolean indicating whether this is a reduce-task. Defaults to None.

  • config (Optional[Dict[str, Any]]) – The config dictionary of the task. Only contains the config parameters of the task (no predecessors).

delete(name, task_name=None, task_unique_config=None)[source]

Deletes object with specified name from results store

Parameters:
  • name (str) – A unique name given to this object.

  • task_name (Optional[str]) – Task name which saved the object.

  • task_unique_config (Optional[Union[Dict, MetaDict]]) – Unique config which specifies the run of the object.

delete_run(task_name=None, task_unique_config=None)[source]

Deletes run with specified name from results store

Parameters:
  • task_name (Optional[str]) – Task name which saved the object.

  • task_unique_config (Optional[Union[Dict, MetaDict]]) – Unique config which specifies the run of the object.

property duration: Optional[timedelta]

The current running time of the task.

classmethod from_spec(task_spec, half_initialize=False)[source]

Initializes a Task object from a TaskSpec object.

Parameters:
  • task_spec (TaskSpec) – A task specification object.

  • half_initialize (bool) – A boolean to indicate whether only the parent Task object is initialized and the child class initialization is delayed until task._init() is called. The half initialization is only needed internally to create a task object without directly executing the __init__ method of the actual task implemented by the user.

Returns:

A task object (fully or half initialized).

Return type:

Task

get_store_context(task_name=None, task_unique_config=None)[source]

Wrapper to get store specific storage context, e.g. the current run directory for Local File Store

Parameters:
Returns:

The store context object holding information like the current run dir.

Return type:

StoreContext

property id: str

A combination of the run-name and the sweep counter or unique config hash if the former does not exist.

property info: TaskInfo

A TaskInfo object that summarizes the important task information for serialization.

load(name, task_name=None, task_unique_config=None, **kwargs)[source]

Loads the given object from results store.

Parameters:
  • name (str) – A unique name given to this object.

  • task_name (Optional[str]) – Task name which saved the loaded object.

  • task_unique_config (Optional[Union[Dict, MetaDict]]) – Unique config which specifies the run of the loaded object.

  • **kwargs – Additional keyword args.

Returns:

The loaded object.

Return type:

Any

open(name=None, task_name=None, task_unique_config=None, mode=None, promise=None, type_=None, sub_dir=None, **open_kwargs)[source]

Wrapper to open a file from Local File Store (only available for Local File Store).

Parameters:
  • name (Optional[str]) – An unique name given to this object.

  • task_name (Optional[str]) – Task name which saved the object.

  • task_unique_config (Optional[Union[Dict, MetaDict]]) – Unique config which specifies the run of the object.

  • mode (Optional[str]) – The open mode, e.g. “r”, “w”, etc.

  • promise (Optional[Promise]) – An optional Promise object used for creating the returned file like object.

  • type_ (Optional[str]) – Additional type specification (e.g. json, which is to be passed to results store).

  • sub_dir (Optional[str]) – A path of a subdirectory used for opening the file.

  • **open_kwargs – Additional keyword arguments passed to the registered open() function.

Returns:

A File object that behaves like a file like object.

Return type:

Optional[File]

property predecessors: List[Union[TaskSpec, Task]]

A List of predecessor task/task spec objects, registered to this task/task spec.

required_by(successor)

Registers a successor task/task spec object to the current task/task spec

Parameters:

successor (Any) – A successor task/task spec objects.

requires(*predecessors)

Registers one or more predecessor task/task spec objects to the current task/task spec

Parameters:

*predecessors (Union[TaskSpec, List[TaskSpec], Task, List[Task]]) – A sequence of predecessor task/task spec objects.

abstract run(**results)[source]

Implementation of core logic of task.

Parameters:

**results – results from predecessors (automatically passed)

save(obj, name, type_=None, **kwargs)[source]

Saves the given object to the results store.

Parameters:
  • obj (Any) – Any object that is to be saved

  • name (str) – A unique name given to this object

  • type_ (Optional[str]) – Additional type specification (e.g. json, which is to be passed to results store). Defaults to None.

  • **kwargs – Additional keyword args.

property successors: List[Union[TaskSpec, Task]]

A List of successor task/task spec objects, registered to this task/task spec.

class fluidml.TaskSpec(task, config=None, additional_kwargs=None, name=None, reduce=None, expand=None, config_ignore_prefix=None, config_group_prefix=None)[source]
Parameters:
expand()[source]

Expands a task specification based on the provided config.

This function is called internally in Flow when building the task graph.

Returns:

A list of expanded Task objects.

Return type:

List[Task]

property predecessors: List[Union[TaskSpec, Task]]

A List of predecessor task/task spec objects, registered to this task/task spec.

required_by(successor)

Registers a successor task/task spec object to the current task/task spec

Parameters:

successor (Any) – A successor task/task spec objects.

requires(*predecessors)

Registers one or more predecessor task/task spec objects to the current task/task spec

Parameters:

*predecessors (Union[TaskSpec, List[TaskSpec], Task, List[Task]]) – A sequence of predecessor task/task spec objects.

property successors: List[Union[TaskSpec, Task]]

A List of successor task/task spec objects, registered to this task/task spec.

fluidml.configure_logging(level='INFO', rich_logging=True, rich_traceback=True)[source]

A Convenience function to initialize and configure logging in the application.

Parameters:
  • level (Union[str, int]) – Logging level to use, e.g. “DEBUG”, “INFO”, etc.

  • rich_logging (bool) – Whether to use the rich library to prettify logging.

  • rich_traceback (bool) – Whether to use the rich library to prettify error tracebacks.