fluidml¶
- class fluidml.Flow(tasks, config_ignore_prefix=None, config_group_prefix=None)[source]¶
Flow implements the core logic of building and expanding task graphs from task specifications.
It automatically expands the tasks based on task specifications and task configs.
It extends the registered dependencies to the expanded tasks.
It provides the task graph objects for introspection and visualization.
Finally, it executes the task graph using multiprocessing if necessary.
- Parameters:
config_ignore_prefix (Optional[str]) – A config key prefix, e.g. “_”. Prefixed keys will be not included in the “unique_config”, which is used to determine whether a run has been executed or not.
config_group_prefix (Optional[str]) – A config grouping prefix, to indicate that to parameters are grouped and expanded using the “zip” method. The grouping prefix enables the “zip” expansion of specific parameters, while all remaining grid parameters are expanded via “product”. Example:
cfg = {"a": [1, 2, "@x"], "b": [1, 2, 3], "c": [1, 2, "@x"]
Without grouping “product” expansion would yield: 2 * 2 * 3 = 12 configs. With grouping “product” expansion yields : 2 * 3 = 6 configs, since the grouped parameters are “zip” expanded.
- run(num_workers=None, resources=None, start_method='spawn', exit_on_error=True, log_to_tmux=False, max_panes_per_window=4, force=None, project_name='uncategorized', run_name=None, results_store=None, return_results=None)[source]¶
Runs the expanded task graph sequentially or in parallel using multiprocessing and returns the results.
- Parameters:
num_workers (Optional[int]) – Number of parallel processes (dolphins) used. Internally, the optimal number of processes
optimal_num_workers
is inferred from the task graph. Ifnum_workers is None
ornum_workers > optimal_num_workers
,num_workers
is overwritten tooptimal_num_workers
. Defaults toNone
.resources (Optional[Union[Any, List[Any]]]) – A single resource object or a list of resources that are assigned to workers. Resources can hold arbitrary data, e.g. gpu or cpu device information, making sure that each worker has access to a dedicated device If
num_workers > 1
andlen(resources) == num_workers
resources are assigned to each worker. Iflen(resources) < num_workers
resources are assigned randomly to workers. Ifnum_workers == 1
the first resourceresources[0]
is assigned to all tasks (not workers). Defaults toNone
.start_method (str) – Start method for multiprocessing. Defaults to
"spawn"
. Only used whennum_workers > 1
.exit_on_error (bool) – When an error happens all workers finish their current tasks and exit gracefully. Defaults to True. Only used when
num_workers > 1
.log_to_tmux (bool) – If
True
a new tmux session is created (given tmux is installed) and each worker (process) logs to a dedicated pane to avoid garbled logging output. The session’s name equals the combinedf"{project_name}--{run-name}"
. Defaults toFalse
. Only used whennum_workers > 1
.max_panes_per_window (int) – Max number of panes per tmux window. Requires
log_to_tmux
being set toTrue
. Defaults to4
. Only used whennum_workers > 1
.force (Optional[Union[str, List[str]]]) – Forcefully re-run tasks. Possible options are: 1)
"all"
- All the tasks are re-run. 2) A task name (e.g. “PreProcessTask”) or list of task names (e.g.["PreProcessTask1", "PreProcessTask2"]
). Additionally, each task name can have the suffix “+” to re-run also its successors (e.g. “PreProcessTask+”).project_name (str) – Name of project. Defaults to
"uncategorized"
.results_store (Optional[ResultsStore]) – An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used.
return_results (Optional[str]) – Return results-dictionary after
run()
. Defaults toall
. Choices:"all", "latest", None
.
- Returns:
- A results dictionary with the following schema.
{"task_name_1": List[TaskResults], "task_name_2": List[TaskResults]}
- Return type:
- class fluidml.Task(name=None, unique_name=None, project_name=None, run_name=None, state=None, started=None, ended=None, run_history=None, sweep_counter=None, unique_config_hash=None, unique_config=None, results_store=None, resource=None, force=None, expects=None, reduce=None, config=None)[source]¶
The base
Task
that provides the user with additional features such as result store and run name access.When implementing a task as a Class
- Parameters:
unique_name (Optional[str]) – Unique name of the task if a run contains multiple instances of the same task, e.g. “Processing-3”.
state (Optional[TaskState]) – The current state of the task, e.g. “running”, “finished”, etc.
run_history (Optional[Dict]) – Holds the task ids of all predecessor task including the task itself.
sweep_counter (Optional[str]) – A dynamically created counter to distinguish different task instances with the same run name in the results store. E.g. used in the LocalFileStore to name run directories.
unique_config_hash (Optional[str]) – An 8 character hash of the unique run config.
unique_config (Optional[Union[Dict, MetaDict]]) – Unique config of the task. Includes all predecessor task configs as well to uniquely define a task.
results_store (Optional[ResultsStore]) – An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used.
resource (Optional[Any]) – A resource object that can hold arbitrary data, e.g. gpu or cpu device information. Resource objects can be assigned in a multiprocessing context, so that each worker process uses a dedicated resource, e.g. cuda device.
force (Optional[bool]) – Indicator if the task is force executed or not.
expects (Optional[Dict[str, Parameter]]) – A dict of expected input arguments and their
inspect.Parameter()
objects of the task’s run method signature.reduce (Optional[bool]) – A boolean indicating whether this is a reduce-task. Defaults to
None
.config (Optional[Dict[str, Any]]) – The config dictionary of the task. Only contains the config parameters of the task (no predecessors).
- delete(name, task_name=None, task_unique_config=None)[source]¶
Deletes object with specified name from results store
- delete_run(task_name=None, task_unique_config=None)[source]¶
Deletes run with specified name from results store
- classmethod from_spec(task_spec, half_initialize=False)[source]¶
Initializes a Task object from a TaskSpec object.
- Parameters:
task_spec (TaskSpec) – A task specification object.
half_initialize (bool) – A boolean to indicate whether only the parent Task object is initialized and the child class initialization is delayed until
task._init()
is called. The half initialization is only needed internally to create a task object without directly executing the__init__
method of the actual task implemented by the user.
- Returns:
A task object (fully or half initialized).
- Return type:
- get_store_context(task_name=None, task_unique_config=None)[source]¶
Wrapper to get store specific storage context, e.g. the current run directory for Local File Store
- property id: str¶
A combination of the run-name and the sweep counter or unique config hash if the former does not exist.
- property info: TaskInfo¶
A
TaskInfo
object that summarizes the important task information for serialization.
- load(name, task_name=None, task_unique_config=None, **kwargs)[source]¶
Loads the given object from results store.
- Parameters:
- Returns:
The loaded object.
- Return type:
- open(name=None, task_name=None, task_unique_config=None, mode=None, promise=None, type_=None, sub_dir=None, **open_kwargs)[source]¶
Wrapper to open a file from Local File Store (only available for Local File Store).
- Parameters:
task_name (Optional[str]) – Task name which saved the object.
task_unique_config (Optional[Union[Dict, MetaDict]]) – Unique config which specifies the run of the object.
promise (Optional[Promise]) – An optional
Promise
object used for creating the returned file like object.type_ (Optional[str]) – Additional type specification (e.g. json, which is to be passed to results store).
sub_dir (Optional[str]) – A path of a subdirectory used for opening the file.
**open_kwargs – Additional keyword arguments passed to the registered
open()
function.
- Returns:
A
File
object that behaves like a file like object.- Return type:
- property predecessors: List[Union[TaskSpec, Task]]¶
A List of predecessor task/task spec objects, registered to this task/task spec.
- required_by(successor)¶
Registers a successor task/task spec object to the current task/task spec
- Parameters:
successor (Any) – A successor task/task spec objects.
- requires(*predecessors)¶
Registers one or more predecessor task/task spec objects to the current task/task spec
- abstract run(**results)[source]¶
Implementation of core logic of task.
- Parameters:
**results – results from predecessors (automatically passed)
- class fluidml.TaskSpec(task, config=None, additional_kwargs=None, name=None, reduce=None, expand=None, config_ignore_prefix=None, config_group_prefix=None)[source]¶
- Parameters:
- expand()[source]¶
Expands a task specification based on the provided config.
This function is called internally in
Flow
when building the task graph.
- property predecessors: List[Union[TaskSpec, Task]]¶
A List of predecessor task/task spec objects, registered to this task/task spec.
- required_by(successor)¶
Registers a successor task/task spec object to the current task/task spec
- Parameters:
successor (Any) – A successor task/task spec objects.
- requires(*predecessors)¶
Registers one or more predecessor task/task spec objects to the current task/task spec