dae.task_graph package
Subpackages
Submodules
dae.task_graph.cache module
- class dae.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]
Bases:
object
Encapsulate information about a task in the cache.
- property error: Any
- property result: Any
- result_or_error: Any = None
- type: CacheRecordType
- class dae.task_graph.cache.CacheRecordType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
- COMPUTED = 1
- ERROR = 2
- NEEDS_COMPUTE = 0
- class dae.task_graph.cache.FileTaskCache(cache_dir: str, *, force: bool = False)[source]
Bases:
TaskCache
Use file modification timestamps to determine if a task needs to run.
- cache(task_node: Task, *, is_error: bool, result: Any) None [source]
Cache the result or exception of a task.
- load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None] [source]
For task in the graph load and yield the cache record.
- class dae.task_graph.cache.NoTaskCache[source]
Bases:
dict
,TaskCache
Don’t check any conditions and just run any task.
- cache(task_node: Task, *, is_error: bool, result: Any) None [source]
Cache the result or exception of a task.
- load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None] [source]
For task in the graph load and yield the cache record.
- class dae.task_graph.cache.TaskCache[source]
Bases:
object
Store the result of a task in a file and reuse it if possible.
- abstract cache(task_node: Task, *, is_error: bool, result: Any) None [source]
Cache the result or exception of a task.
- static create(*, force: bool | None = None, cache_dir: str | None = None, no_cache: bool = False) TaskCache [source]
Create the appropriate task cache.
- abstract load(graph: TaskGraph) Iterator[tuple[Task, CacheRecord]] [source]
For task in the graph load and yield the cache record.
dae.task_graph.cli_tools module
- class dae.task_graph.cli_tools.TaskGraphCli[source]
Bases:
object
Takes care of creating a task graph executor and executing a graph.
- static add_arguments(parser: ArgumentParser, force_mode: str = 'optional', default_task_status_dir: str | None = './.task-progress', *, use_commands: bool = True, never_cache: bool = False) None [source]
Add arguments needed to execute a task graph.
- static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor [source]
Create a task graph executor according to the args specified.
dae.task_graph.demo_graphs_cli module
dae.task_graph.executor module
- class dae.task_graph.executor.AbstractTaskGraphExecutor(task_cache: TaskCache = {})[source]
Bases:
TaskGraphExecutor
Executor that walks the graph in order that satisfies dependancies.
- execute(task_graph: TaskGraph) Iterator[tuple[Task, Any]] [source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not nessessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only garantee is that when a task is returned its executions is already finished.
- class dae.task_graph.executor.DaskExecutor(client: Client, task_cache: TaskCache = {}, **kwargs: Any)[source]
Bases:
AbstractTaskGraphExecutor
Execute tasks in parallel using Dask to do the heavy lifting.
- MIN_QUEUE_SIZE = 700
- class dae.task_graph.executor.SequentialExecutor(task_cache: TaskCache = {}, **kwargs: Any)[source]
Bases:
AbstractTaskGraphExecutor
A Task Graph Executor that executes task in sequential order.
- class dae.task_graph.executor.TaskGraphExecutor[source]
Bases:
object
Class that executes a task graph.
- abstract execute(task_graph: TaskGraph) Iterator[tuple[Task, Any]] [source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not nessessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only garantee is that when a task is returned its executions is already finished.
- dae.task_graph.executor.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool [source]
Check if the task graph is fully executed.
When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.
- dae.task_graph.executor.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool [source]
Execute (runs) the task_graph with the given executor.
- dae.task_graph.executor.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor) Generator[Any, None, None] [source]
Run a task graph, yielding the results from each task.
dae.task_graph.graph module
- class dae.task_graph.graph.Task(task_id: str, func: Callable, args: list[Any], deps: list[Task], input_files: list[str])[source]
Bases:
object
Represent one node in a TaskGraph together with its dependencies.
- args: list[Any]
- func: Callable
- input_files: list[str]
- task_id: str
- class dae.task_graph.graph.TaskGraph[source]
Bases:
object
An object representing a graph of tasks.
- create_task(task_id: str, func: Callable[[...], Any], args: list, deps: list[Task], input_files: list[str] | None = None) Task [source]
Create a new task and add it to the graph.
- Parameters:
name – Name of the task (used for debugging purposes)
func – Function to execute
args – Arguments to that function
deps – List of TaskNodes on which the current task depends
input_files – Files that were used to build the graph itself
- Return TaskNode:
The newly created task node in the graph
dae.task_graph.logging module
- class dae.task_graph.logging.FsspecHandler(logfile: str)[source]
Bases:
StreamHandler
Class to create fsspec based logging handler.