dae.task_graph package

Submodules

dae.task_graph.base_executor module

class dae.task_graph.base_executor.TaskGraphExecutorBase(task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: TaskGraphExecutor

Executor that walks the graph in order that satisfies dependancies.

execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only guarantee is that when a task is returned its execution is already finished.

dae.task_graph.cache module

class dae.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]

Bases: object

Encapsulate information about a task in the cache.

property error: Any
property result: Any
result_or_error: Any = None
type: CacheRecordType
class dae.task_graph.cache.CacheRecordType(*values)[source]

Bases: Enum

COMPUTED = 1
ERROR = 2
NEEDS_COMPUTE = 0
class dae.task_graph.cache.FileTaskCache(cache_dir: str, *, force: bool = False)[source]

Bases: TaskCache

Use file modification timestamps to determine if a task needs to run.

cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]

For task in the graph load and yield the cache record.

class dae.task_graph.cache.NoTaskCache[source]

Bases: dict, TaskCache

Don’t check any conditions and just run any task.

cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]

For task in the graph load and yield the cache record.

class dae.task_graph.cache.TaskCache[source]

Bases: object

Store the result of a task in a file and reuse it if possible.

abstractmethod cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

static create(*, force: bool = False, task_progress_mode: bool = True, cache_dir: str | None = None) TaskCache[source]

Create the appropriate task cache.

abstractmethod load(graph: TaskGraph) Iterator[tuple[Task, CacheRecord]][source]

For task in the graph load and yield the cache record.

dae.task_graph.cli_tools module

class dae.task_graph.cli_tools.TaskGraphCli[source]

Bases: object

Takes care of creating a task graph executor and executing a graph.

static add_arguments(parser: ArgumentParser, *, task_progress_mode: bool = True, default_task_status_dir: str | None = './.task-progress', use_commands: bool = True) None[source]

Add arguments needed to execute a task graph.

static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor[source]

Create a task graph executor according to the args specified.

static process_graph(task_graph: TaskGraph, *, task_progress_mode: bool = True, **kwargs: Any) bool[source]

Process task_graph in according with the arguments in args.

Return true if the graph get’s successfully processed.

dae.task_graph.cli_tools.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool[source]

Check if the task graph is fully executed.

When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.

dae.task_graph.cli_tools.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool[source]

Execute (runs) the task_graph with the given executor.

dae.task_graph.cli_tools.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) Generator[Any, None, None][source]

Run a task graph, yielding the results from each task.

dae.task_graph.cli_tools.task_graph_status(task_graph: TaskGraph, task_cache: TaskCache, verbose: int | None) bool[source]

Show the status of each task from the task graph.

dae.task_graph.dask_executor module

class dae.task_graph.dask_executor.DaskExecutor(dask_client: Client, task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: TaskGraphExecutorBase

Dask-based task graph executor.

close() None[source]

Close the Dask executor.

dae.task_graph.demo_graphs_cli module

dae.task_graph.demo_graphs_cli.build_demo_graph(graph_type: str, graph_params: list[str] | None) TaskGraph[source]

Build a demo graph.

dae.task_graph.demo_graphs_cli.main(argv: list[str] | None = None) None[source]

Entry point for the demo script.

dae.task_graph.demo_graphs_cli.task_part(seconds: str) str[source]
dae.task_graph.demo_graphs_cli.task_part_b(seconds: str) str[source]
dae.task_graph.demo_graphs_cli.task_part_c(seconds: str, *_args: str) str[source]
dae.task_graph.demo_graphs_cli.task_summary(seconds: str) None[source]
dae.task_graph.demo_graphs_cli.task_summary_b(seconds: str, *args: str) str[source]
dae.task_graph.demo_graphs_cli.task_summary_c(seconds: str, *args: str) str[source]
dae.task_graph.demo_graphs_cli.timeout(seconds: str) float[source]

dae.task_graph.executor module

class dae.task_graph.executor.TaskGraphExecutor[source]

Bases: object

Class that executes a task graph.

abstractmethod close() None[source]

Clean-up any resources used by the executor.

abstractmethod execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only guarantee is that when a task is returned its execution is already finished.

dae.task_graph.graph module

class dae.task_graph.graph.Task(task_id: str)[source]

Bases: object

Represent one node in a TaskGraph together with its dependencies.

task_id: str
class dae.task_graph.graph.TaskDesc(task: Task, func: Callable, args: list[Any], deps: list[Task], input_files: list[str])[source]

Bases: object

Represent an immutable full task description with all its properties.

args: list[Any]
deps: list[Task]
func: Callable
input_files: list[str]
task: Task
class dae.task_graph.graph.TaskGraph[source]

Bases: object

An object representing a graph of tasks.

create_task(task_id: str, func: Callable[[...], Any], *, args: Sequence, deps: Sequence[Task], input_files: Sequence[str] | None = None) Task[source]

Create a new task and add it to the graph.

Parameters:
  • name – Name of the task (used for debugging purposes)

  • func – Function to execute

  • args – Arguments to that function

  • deps – List of TaskNodes on which the current task depends

  • input_files – Files that were used to build the graph itself

Return TaskNode:

The newly created task node in the graph

empty() bool[source]

Check if the graph is empty.

extract_tasks(selected_tasks: Sequence[Task]) Sequence[TaskDesc][source]

Collects tasks from the task graph and and removes them.

get_task_deps(task: Task) list[Task][source]

Get dependancies of a task suitable for dask executor.

get_task_desc(task: Task) TaskDesc[source]

Get full task description for a given task.

process_completed_tasks(task_result: Sequence[tuple[Task, Any]]) None[source]

Process a completed task.

Parameters:

task – Completed task

prune(tasks_to_keep: Sequence[Task | str]) None[source]

Prune to keep the specified tasks and their dependencies.

ready_tasks() Sequence[Task][source]

Return tasks which have no dependencies.

property tasks: Sequence[Task]

Return all tasks in the graph.

topological_order() Sequence[Task][source]

Return tasks in topological order.

dae.task_graph.graph.sync_tasks() None[source]

dae.task_graph.logging module

class dae.task_graph.logging.FsspecHandler(logfile: str)[source]

Bases: StreamHandler

Class to create fsspec based logging handler.

close() None[source]

Close the stream.

Copied from logging.FileHandler.close().

dae.task_graph.logging.configure_task_logging(log_dir: str | None, task_id: str, verbosity: int) Handler[source]

Configure and return task logging hadnler.

dae.task_graph.logging.ensure_log_dir(**kwargs: Any) str[source]

Ensure logging directory exists.

dae.task_graph.logging.safe_task_id(task_id: str) str[source]

dae.task_graph.process_pool_executor module

class dae.task_graph.process_pool_executor.ProcessPoolTaskExecutor(**kwargs: Any)[source]

Bases: TaskGraphExecutorBase

Execute tasks in parallel using Dask to do the heavy lifting.

close() None[source]

Clean-up any resources used by the executor.

dae.task_graph.sequential_executor module

class dae.task_graph.sequential_executor.SequentialExecutor(task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: TaskGraphExecutorBase

A Task Graph Executor that executes task in sequential order.

close() None[source]

Close the executor and release resources.

Module contents