dae.task_graph package
Submodules
dae.task_graph.base_executor module
- class dae.task_graph.base_executor.TaskGraphExecutorBase(task_cache: TaskCache = {}, **kwargs: Any)[source]
Bases:
TaskGraphExecutorExecutor that walks the graph in order that satisfies dependancies.
- execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only guarantee is that when a task is returned its execution is already finished.
dae.task_graph.cache module
- class dae.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]
Bases:
objectEncapsulate information about a task in the cache.
- property error: Any
- property result: Any
- result_or_error: Any = None
- type: CacheRecordType
- class dae.task_graph.cache.CacheRecordType(*values)[source]
Bases:
Enum- COMPUTED = 1
- ERROR = 2
- NEEDS_COMPUTE = 0
- class dae.task_graph.cache.FileTaskCache(cache_dir: str, *, force: bool = False)[source]
Bases:
TaskCacheUse file modification timestamps to determine if a task needs to run.
- cache(task_node: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]
For task in the graph load and yield the cache record.
- class dae.task_graph.cache.NoTaskCache[source]
Bases:
dict,TaskCacheDon’t check any conditions and just run any task.
- cache(task_node: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]
For task in the graph load and yield the cache record.
- class dae.task_graph.cache.TaskCache[source]
Bases:
objectStore the result of a task in a file and reuse it if possible.
- abstractmethod cache(task_node: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- static create(*, force: bool = False, task_progress_mode: bool = True, cache_dir: str | None = None) TaskCache[source]
Create the appropriate task cache.
- abstractmethod load(graph: TaskGraph) Iterator[tuple[Task, CacheRecord]][source]
For task in the graph load and yield the cache record.
dae.task_graph.cli_tools module
- class dae.task_graph.cli_tools.TaskGraphCli[source]
Bases:
objectTakes care of creating a task graph executor and executing a graph.
- static add_arguments(parser: ArgumentParser, *, task_progress_mode: bool = True, default_task_status_dir: str | None = './.task-progress', use_commands: bool = True) None[source]
Add arguments needed to execute a task graph.
- static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor[source]
Create a task graph executor according to the args specified.
- dae.task_graph.cli_tools.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool[source]
Check if the task graph is fully executed.
When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.
- dae.task_graph.cli_tools.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool[source]
Execute (runs) the task_graph with the given executor.
- dae.task_graph.cli_tools.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) Generator[Any, None, None][source]
Run a task graph, yielding the results from each task.
dae.task_graph.dask_executor module
dae.task_graph.demo_graphs_cli module
- dae.task_graph.demo_graphs_cli.build_demo_graph(graph_type: str, graph_params: list[str] | None) TaskGraph[source]
Build a demo graph.
dae.task_graph.executor module
- class dae.task_graph.executor.TaskGraphExecutor[source]
Bases:
objectClass that executes a task graph.
- abstractmethod execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only guarantee is that when a task is returned its execution is already finished.
dae.task_graph.graph module
- class dae.task_graph.graph.Task(task_id: str)[source]
Bases:
objectRepresent one node in a TaskGraph together with its dependencies.
- task_id: str
- class dae.task_graph.graph.TaskDesc(task: Task, func: Callable, args: list[Any], deps: list[Task], input_files: list[str])[source]
Bases:
objectRepresent an immutable full task description with all its properties.
- args: list[Any]
- func: Callable
- input_files: list[str]
- class dae.task_graph.graph.TaskGraph[source]
Bases:
objectAn object representing a graph of tasks.
- create_task(task_id: str, func: Callable[[...], Any], *, args: Sequence, deps: Sequence[Task], input_files: Sequence[str] | None = None) Task[source]
Create a new task and add it to the graph.
- Parameters:
name – Name of the task (used for debugging purposes)
func – Function to execute
args – Arguments to that function
deps – List of TaskNodes on which the current task depends
input_files – Files that were used to build the graph itself
- Return TaskNode:
The newly created task node in the graph
- extract_tasks(selected_tasks: Sequence[Task]) Sequence[TaskDesc][source]
Collects tasks from the task graph and and removes them.
- get_task_deps(task: Task) list[Task][source]
Get dependancies of a task suitable for dask executor.
- process_completed_tasks(task_result: Sequence[tuple[Task, Any]]) None[source]
Process a completed task.
- Parameters:
task – Completed task
dae.task_graph.logging module
- class dae.task_graph.logging.FsspecHandler(logfile: str)[source]
Bases:
StreamHandlerClass to create fsspec based logging handler.
dae.task_graph.process_pool_executor module
- class dae.task_graph.process_pool_executor.ProcessPoolTaskExecutor(**kwargs: Any)[source]
Bases:
TaskGraphExecutorBaseExecute tasks in parallel using Dask to do the heavy lifting.