Source code for fluidml.flow

import logging
import multiprocessing
import sys
from collections import defaultdict
from itertools import product
from typing import Any, Dict, List, Optional, Set, Union

import networkx as nx
from metadict import MetaDict
from networkx import DiGraph
from networkx.algorithms.dag import topological_sort

from fluidml.exception import CyclicGraphError, NoTasksError, TaskNameError
from fluidml.storage import InMemoryStore, ResultsStore
from fluidml.swarm import Swarm
from fluidml.task import Task, TaskResults
from fluidml.task_spec import TaskSpec
from fluidml.utils import generate_run_name, reformat_config, update_merge

logger = logging.getLogger(__name__)


[docs]class Flow: """Flow implements the core logic of building and expanding task graphs from task specifications. * It automatically expands the tasks based on task specifications and task configs. * It extends the registered dependencies to the expanded tasks. * It provides the task graph objects for introspection and visualization. * Finally, it executes the task graph using multiprocessing if necessary. Args: tasks: List of task specifications. config_ignore_prefix: A config key prefix, e.g. "_". Prefixed keys will be not included in the "unique_config", which is used to determine whether a run has been executed or not. config_group_prefix: A config grouping prefix, to indicate that to parameters are grouped and expanded using the "zip" method. The grouping prefix enables the "zip" expansion of specific parameters, while all remaining grid parameters are expanded via "product". Example: ``cfg = {"a": [1, 2, "@x"], "b": [1, 2, 3], "c": [1, 2, "@x"]`` Without grouping "product" expansion would yield: `2 * 2 * 3 = 12` configs. With grouping "product" expansion yields : `2 * 3 = 6` configs, since the grouped parameters are "zip" expanded. """ def __init__( self, tasks: List[TaskSpec], config_ignore_prefix: Optional[str] = None, config_group_prefix: Optional[str] = None, ): # assign config_ignore_prefix and config_group_prefix to all tasks if the task has no prefix assigned, yet for task_spec in tasks: if config_group_prefix is not None and task_spec.config_group_prefix is None: task_spec.config_group_prefix = config_group_prefix if config_ignore_prefix is not None and task_spec.config_ignore_prefix is None: task_spec.config_ignore_prefix = config_ignore_prefix # contains the expanded graph as list of Task objects -> used internally in swarm self._expanded_tasks: Optional[List[Task]] = None # contains the expanded graph as networkx DiGraph -> accessible to the user for introspection and visualization self.task_graph: Optional[DiGraph] = None # contains the original, user defined task spec graph as networkx DiGraph # -> accessible to the user for introspection and visualization self.task_spec_graph: Optional[DiGraph] = None # create expanded graph and set above attributes self._create(task_specs=tasks) @property def num_tasks(self) -> int: """The total number of expanded tasks in the task graph.""" return len(self._expanded_tasks) def _create(self, task_specs: List[TaskSpec]): if not task_specs: raise NoTasksError("There are no tasks to run.") Flow._check_no_task_name_clash(task_specs=task_specs) ordered_task_specs = self._order_task_specs(task_specs=task_specs) self._expanded_tasks: List[Task] = Flow._expand_and_link_tasks(ordered_task_specs) self.task_graph: DiGraph = Flow._create_graph_from_task_spec_list( task_specs=self._expanded_tasks, name="task graph" ) def _infer_optimal_number_of_workers_from_graph(self) -> int: """Analyzes the graphs layout and finds the maximal number of tasks executed in parallel.""" # get root tasks root_tasks = [task.unique_name for task in self._expanded_tasks if len(task.predecessors) == 0] # assign to each node the node's level in the directed graph node_to_lvl = {} for root in root_tasks: # assign lvl 0 to all root nodes lvl = 0 node_to_lvl[root] = lvl # assign lvl to all successor nodes recursively Flow._assign_lvl_to_node_recursively(root, self.task_graph, lvl, node_to_lvl) # get the amount of nodes per lvl in the graph lvl_to_num = defaultdict(int) for node, lvl in node_to_lvl.items(): lvl_to_num[lvl] += 1 # return the maximum amount of nodes on the same level # if possible this equals the optimal amount of processes for parallelization optimal_num_workers = max([num for num in lvl_to_num.values()]) return optimal_num_workers @staticmethod def _assign_lvl_to_node_recursively(node: str, graph: nx.DiGraph, lvl: int, node_to_lvl: Dict[str, int]): lvl += 1 for successor in graph.successors(node): if successor not in node_to_lvl or node_to_lvl[successor] < lvl: node_to_lvl[successor] = lvl Flow._assign_lvl_to_node_recursively(successor, graph, lvl, node_to_lvl) @staticmethod def _check_acyclic(graph: DiGraph) -> None: # try to find cycles in graph -> return None if None are found try: edges = nx.find_cycle(graph, orientation="original") except nx.NetworkXNoCycle: return # gather nodes involved in cycle and raise an error nodes_containing_cycle: Set[str] = set() for from_node, to_node, _ in edges: nodes_containing_cycle.add(from_node) nodes_containing_cycle.add(to_node) msg = f'Pipeline has a cycle involving: {", ".join(list(nodes_containing_cycle))}.' raise CyclicGraphError(msg) @staticmethod def _check_no_task_name_clash(task_specs: List[TaskSpec]) -> None: from importlib import import_module for task_spec in task_specs: task_obj = task_spec.task task_name = task_spec.task.__name__ module_name = getattr(task_obj, "__module__") import_module(module_name) module = sys.modules[module_name] obj = getattr(module, task_name) if obj is not task_obj: raise TaskNameError( f"Task names have to be unique. " f'A second object different from task "{task_name}" was found with the same name: \n' f"{obj} in {module}." ) @staticmethod def _create_graph_from_task_spec_list( task_specs: List[Union[TaskSpec, Task]], name: Optional[str] = None ) -> DiGraph: """Creates nx.DiGraph object of the list of defined tasks with registered dependencies.""" graph = DiGraph() for task_spec in task_specs: graph.add_node(task_spec.unique_name, task=task_spec) for predecessor in task_spec.predecessors: graph.add_node(predecessor.unique_name, task=predecessor) graph.add_edge(predecessor.unique_name, task_spec.unique_name) if name is not None: graph.name = name return graph def _create_task_spec_graph(self, task_specs: List[TaskSpec]) -> DiGraph: task_spec_graph = Flow._create_graph_from_task_spec_list(task_specs=task_specs, name="task spec graph") # assure that task spec graph contains no cyclic dependencies Flow._check_acyclic(graph=task_spec_graph) self.task_spec_graph = task_spec_graph return task_spec_graph def _register_tasks_to_force_execute(self, force: Union[str, List[str]]) -> None: # make sure that "all" is provided in the correct way, either as str or as str in a list of length 1 if isinstance(force, List): if len(force) == 1 and force[0] == "all": force = force[0] elif "all" in force: raise TypeError('"all" must be provided as str or list of length 1 to the "force" argument.') # if force == 'all' set force to True for all tasks if force == "all": for task in self._expanded_tasks: task.force = True return # convert to list if force is of type str if isinstance(force, str): force = [force] if not isinstance(force, List): raise TypeError('"force" argument has to be of type str or list of str.') # get all user provided task names to force execute force_task_names = [task_name[:-1] if task_name[-1] == "+" else task_name for task_name in force] # get all task names defined in the task spec graph task_names = list(self.task_spec_graph.nodes) # find all user provided task names to force execute that don't exist in the task spec graph unknown_task_names = list(set(force_task_names).difference(task_names)) # if any unknown names are found, raise a ValueError if unknown_task_names: raise ValueError( f'The following task names provided to "force" are unknown: ' f'{", ".join(unknown_task_names)}' ) # create a list of unique task names to force execute # if the user adds "+" to a task names we find all successor tasks in the graph and add them to the # force execute list. tasks_to_force_execute = [] for task_name in force: if task_name[-1] == "+": task_name = task_name[:-1] successor_tasks = [ successor for successors in nx.dfs_successors(self.task_spec_graph, task_name).values() for successor in successors ] tasks_to_force_execute.extend(successor_tasks) tasks_to_force_execute.append(task_name) tasks_to_force_execute = list(set(tasks_to_force_execute)) # set force == True for all tasks in the created tasks_to_force_execute list for task in self._expanded_tasks: if task.name in tasks_to_force_execute: task.force = True def _order_task_specs( self, task_specs: List[TaskSpec], ) -> List[TaskSpec]: # task spec graph holding the user defined dependency structure task_spec_graph: DiGraph = self._create_task_spec_graph(task_specs=task_specs) # topological ordering of tasks in graph sorted_specs = [task_spec_graph.nodes[task_name]["task"] for task_name in topological_sort(task_spec_graph)] return sorted_specs @staticmethod def _validate_task_combination(task_combination: List[Task]) -> bool: def _match(task_cfgs: List[Dict[str, Any]]): # If the list of task_cfgs is empty we return True and continue with the next combination if not task_cfgs: return True unique_cfgs = [] for config in task_cfgs: if config not in unique_cfgs: unique_cfgs.append(config) # the predecessor tasks in the combination have no contradicting configs (they come from the same sweep) if len(unique_cfgs) == 1: return True return False # we validate the task combinations based on their path in the task graph # if two tasks have same parent task and their configs are different # then the combination is not valid task_names_in_path = list(set(name for task in task_combination for name in task.unique_config)) # for each defined task name in path config for name in task_names_in_path: # we collect configs for this task name from each predecessor task in the task combination list # if a predecessor task is of type reduce or its config doesn't contain the above task name we skip task_configs = [ task.unique_config[name] for task in task_combination if not task.reduce and name in task.unique_config.keys() ] # if they do not match, return False if not _match(task_configs): return False return True @staticmethod def _get_predecessor_product(expanded_tasks_by_name: Dict[str, List[Task]], spec: TaskSpec) -> List[List[Task]]: predecessor_tasks = [expanded_tasks_by_name[predecessor.name] for predecessor in spec.predecessors] task_combinations = [list(item) for item in product(*predecessor_tasks)] if predecessor_tasks else [[]] task_combinations = [ combination for combination in task_combinations if Flow._validate_task_combination(combination) ] return task_combinations @staticmethod def _combine_task_config(task_combination: List[Task]) -> Dict: config = {} for task in task_combination: config = {**config, **task.unique_config} return config @staticmethod def _merge_task_combination_configs(task_combinations: List[List[Task]], task_specs: List[TaskSpec]) -> Dict: task_configs = [task.unique_config for combination in task_combinations for task in combination] merged_config = task_configs.pop(0) for config in task_configs: merged_config: Dict = update_merge(merged_config, config) if task_configs: # get all task names that were specified as GridTaskSpec grid_task_names = [spec.name for spec in task_specs if spec.expand_fn is not None] # split merged_config in grid_task_config and normal_task_config grid_task_config = {key: value for key, value in merged_config.items() if key in grid_task_names} normal_task_config = {key: value for key, value in merged_config.items() if key not in grid_task_names} # reformat only grid_task_config (replace tuples by lists) grid_task_config: Dict = reformat_config(grid_task_config) # merge back the normal_task_config with the formatted grid_task_config merged_config = {**normal_task_config, **grid_task_config} return merged_config @staticmethod def _expand_and_link_tasks(specs: List[TaskSpec]) -> List[Task]: # keep track of expanded task_specs by their names expanded_tasks_by_name = defaultdict(list) # for each spec to expand for spec in specs: # get predecessor task combinations task_combinations = Flow._get_predecessor_product(expanded_tasks_by_name, spec) if spec.reduce: # if it is a reduce task, just add the predecessor task combinations as parents expanded_tasks = spec.expand() for task in expanded_tasks: # merge configs from all predecessors to get a single reduced predecessor config # note: this config might differ from the original grid config since original grid lists # containing dicts can not be recovered when merging expanded configs predecessor_config = Flow._merge_task_combination_configs(task_combinations, specs) # add dependencies for task_combination in task_combinations: task.requires(task_combination) # task.counter = task_counter expanded_tasks_by_name[task.name].append(task) task.unique_config = MetaDict( { **predecessor_config, **{task.name: task.unique_config}, } ) else: # for each combination, create a new task for task_combination in task_combinations: expanded_tasks = spec.expand() # shared predecessor config predecessor_config = Flow._combine_task_config(task_combination) # for each task that is created, add ids and dependencies for task in expanded_tasks: task.requires(task_combination) task.unique_config = MetaDict( { **predecessor_config, **{task.name: task.unique_config}, } ) expanded_tasks_by_name[task.name].append(task) # create final list of linked task specs and set expansion id for expanded specs all_tasks = [] for expanded_tasks in expanded_tasks_by_name.values(): if len(expanded_tasks) == 1: for task in expanded_tasks: all_tasks.append(task) else: for expansion_id, task in enumerate(expanded_tasks, 1): task.unique_name = f"{task.name}-{expansion_id}" all_tasks.append(task) return all_tasks def _get_number_of_used_workers(self, num_workers: Optional[int] = None) -> int: """Get the number of workers, given the optimal, maximum and user-defined number. 1. get maximum number of available workers. 2. infer optimal number of workers given the expanded graph to process 3. define num_workers based on optimal_num_workers Args: num_workers: Number of workers set by the user. Returns: Number of workers used. """ max_num_workers = multiprocessing.cpu_count() optimal_num_workers = self._infer_optimal_number_of_workers_from_graph() optimal_num_workers = optimal_num_workers if optimal_num_workers <= max_num_workers else max_num_workers if num_workers is None or num_workers > optimal_num_workers: num_workers = optimal_num_workers return num_workers @staticmethod def _process_resources(num_workers: int, resources: Optional[Any] = None) -> Optional[List[Any]]: # convert resources to list if a single resource was provided # and trim list of resources to actual number of used workers if resources is None: return None elif isinstance(resources, List): return resources[:num_workers] else: return [resources]
[docs] def run( self, num_workers: Optional[int] = None, resources: Optional[Union[Any, List[Any]]] = None, start_method: str = "spawn", exit_on_error: bool = True, log_to_tmux: bool = False, max_panes_per_window: int = 4, force: Optional[Union[str, List[str]]] = None, project_name: str = "uncategorized", run_name: Optional[str] = None, results_store: Optional[ResultsStore] = None, return_results: Optional[str] = None, ) -> Optional[Dict[str, List[TaskResults]]]: """Runs the expanded task graph sequentially or in parallel using multiprocessing and returns the results. Args: num_workers: Number of parallel processes (dolphins) used. Internally, the optimal number of processes ``optimal_num_workers`` is inferred from the task graph. If ``num_workers is None`` or ``num_workers > optimal_num_workers``, ``num_workers`` is overwritten to ``optimal_num_workers``. Defaults to ``None``. resources: A single resource object or a list of resources that are assigned to workers. Resources can hold arbitrary data, e.g. gpu or cpu device information, making sure that each worker has access to a dedicated device If ``num_workers > 1`` and ``len(resources) == num_workers`` resources are assigned to each worker. If ``len(resources) < num_workers`` resources are assigned randomly to workers. If ``num_workers == 1`` the first resource ``resources[0]`` is assigned to all tasks (not workers). Defaults to ``None``. start_method: Start method for multiprocessing. Defaults to ``"spawn"``. Only used when ``num_workers > 1``. exit_on_error: When an error happens all workers finish their current tasks and exit gracefully. Defaults to True. Only used when ``num_workers > 1``. log_to_tmux: If ``True`` a new tmux session is created (given tmux is installed) and each worker (process) logs to a dedicated pane to avoid garbled logging output. The session's name equals the combined ``f"{project_name}--{run-name}"``. Defaults to ``False``. Only used when ``num_workers > 1``. max_panes_per_window: Max number of panes per tmux window. Requires ``log_to_tmux`` being set to ``True``. Defaults to ``4``. Only used when ``num_workers > 1``. force: Forcefully re-run tasks. Possible options are: 1) ``"all"`` - All the tasks are re-run. 2) A task name (e.g. "PreProcessTask") or list of task names (e.g. ``["PreProcessTask1", "PreProcessTask2"]``). Additionally, each task name can have the suffix "+" to re-run also its successors (e.g. "PreProcessTask+"). project_name: Name of project. Defaults to ``"uncategorized"``. run_name: Name of run. results_store: An instance of results store for results management. If nothing is provided, a non-persistent InMemoryStore store is used. return_results: Return results-dictionary after ``run()``. Defaults to ``all``. Choices: ``"all", "latest", None``. Returns: A results dictionary with the following schema. ``{"task_name_1": List[TaskResults], "task_name_2": List[TaskResults]}`` """ if force is not None: self._register_tasks_to_force_execute(force=force) # generate random run name in the form of "adjective-noun" if run_name is None: run_name = generate_run_name() # if InMemoryStore is used, return the latest pipeline results if return_results is None and (results_store is None or isinstance(results_store, InMemoryStore)): return_results = "latest" # get maximum number of available workers # and infer optimal number of workers given the expanded graph to process num_workers = self._get_number_of_used_workers(num_workers) # convert resources to list if a single resource was provided # and trim list of resources to actual number of used workers resources = self._process_resources(num_workers, resources) with Swarm( n_dolphins=num_workers, resources=resources, start_method=start_method, exit_on_error=exit_on_error, log_to_tmux=log_to_tmux, max_panes_per_window=max_panes_per_window, ) as swarm: results = swarm.work( tasks=self._expanded_tasks, run_name=run_name, project_name=project_name, results_store=results_store, return_results=return_results, ) return results