dae.task_graph package

Subpackages

Submodules

dae.task_graph.cache module

class dae.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]

Bases: object

Encapsulate information about a task in the cache.

property error: Any
property result: Any
result_or_error: Any = None
type: CacheRecordType
class dae.task_graph.cache.CacheRecordType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

COMPUTED = 1
ERROR = 2
NEEDS_COMPUTE = 0
class dae.task_graph.cache.FileTaskCache(cache_dir: str, *, force: bool = False)[source]

Bases: TaskCache

Use file modification timestamps to determine if a task needs to run.

cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]

For task in the graph load and yield the cache record.

class dae.task_graph.cache.NoTaskCache[source]

Bases: dict, TaskCache

Don’t check any conditions and just run any task.

cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

load(graph: TaskGraph) Generator[tuple[Task, CacheRecord], None, None][source]

For task in the graph load and yield the cache record.

class dae.task_graph.cache.TaskCache[source]

Bases: object

Store the result of a task in a file and reuse it if possible.

abstract cache(task_node: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

static create(*, force: bool | None = None, cache_dir: str | None = None, no_cache: bool = False) TaskCache[source]

Create the appropriate task cache.

abstract load(graph: TaskGraph) Iterator[tuple[Task, CacheRecord]][source]

For task in the graph load and yield the cache record.

dae.task_graph.cli_tools module

class dae.task_graph.cli_tools.TaskGraphCli[source]

Bases: object

Takes care of creating a task graph executor and executing a graph.

static add_arguments(parser: ArgumentParser, force_mode: str = 'optional', default_task_status_dir: str | None = './.task-progress', *, use_commands: bool = True, never_cache: bool = False) None[source]

Add arguments needed to execute a task graph.

static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor[source]

Create a task graph executor according to the args specified.

static process_graph(task_graph: TaskGraph, force_mode: str = 'optional', **kwargs: Any) bool[source]

Process task_graph in according with the arguments in args.

Return true if the graph get’s successfully processed.

dae.task_graph.demo_graphs_cli module

dae.task_graph.demo_graphs_cli.build_demo_graph(graph_type: str, graph_params: list[str] | None) TaskGraph[source]

Build a demo graph.

dae.task_graph.demo_graphs_cli.main(argv: list[str] | None = None) None[source]

Entry point for the demo script.

dae.task_graph.executor module

class dae.task_graph.executor.AbstractTaskGraphExecutor(task_cache: TaskCache = {})[source]

Bases: TaskGraphExecutor

Executor that walks the graph in order that satisfies dependancies.

close() None[source]

Clean-up any resources used by the executor.

execute(task_graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not nessessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only garantee is that when a task is returned its executions is already finished.

class dae.task_graph.executor.DaskExecutor(client: Client, task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: AbstractTaskGraphExecutor

Execute tasks in parallel using Dask to do the heavy lifting.

MIN_QUEUE_SIZE = 700
close() None[source]

Clean-up any resources used by the executor.

class dae.task_graph.executor.SequentialExecutor(task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: AbstractTaskGraphExecutor

A Task Graph Executor that executes task in sequential order.

class dae.task_graph.executor.TaskGraphExecutor[source]

Bases: object

Class that executes a task graph.

abstract close() None[source]

Clean-up any resources used by the executor.

abstract execute(task_graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not nessessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only garantee is that when a task is returned its executions is already finished.

dae.task_graph.executor.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool[source]

Check if the task graph is fully executed.

When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.

dae.task_graph.executor.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool[source]

Execute (runs) the task_graph with the given executor.

dae.task_graph.executor.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor) Generator[Any, None, None][source]

Run a task graph, yielding the results from each task.

dae.task_graph.executor.task_graph_status(task_graph: TaskGraph, task_cache: TaskCache, verbose: int | None) bool[source]

Show the status of each task from the task graph.

dae.task_graph.graph module

class dae.task_graph.graph.Task(task_id: str, func: Callable, args: list[Any], deps: list[Task], input_files: list[str])[source]

Bases: object

Represent one node in a TaskGraph together with its dependencies.

args: list[Any]
deps: list[Task]
func: Callable
input_files: list[str]
task_id: str
class dae.task_graph.graph.TaskGraph[source]

Bases: object

An object representing a graph of tasks.

create_task(task_id: str, func: Callable[[...], Any], args: list, deps: list[Task], input_files: list[str] | None = None) Task[source]

Create a new task and add it to the graph.

Parameters:
  • name – Name of the task (used for debugging purposes)

  • func – Function to execute

  • args – Arguments to that function

  • deps – List of TaskNodes on which the current task depends

  • input_files – Files that were used to build the graph itself

Return TaskNode:

The newly created task node in the graph

prune(ids_to_keep: Iterable[str]) TaskGraph[source]

Prune tasks which are not in ids_to_keep or in their deps.

tasks ids which are in ids_to_keep but not in the graph are simply assumed to have already been removed and no error is raised.

dae.task_graph.logging module

class dae.task_graph.logging.FsspecHandler(logfile: str)[source]

Bases: StreamHandler

Class to create fsspec based logging handler.

close() None[source]

Close the stream.

Copied from logging.FileHandler.close().

dae.task_graph.logging.configure_task_logging(log_dir: str | None, task_id: str, verbosity: int) Handler[source]

Configure and return task logging hadnler.

dae.task_graph.logging.ensure_log_dir(**kwargs: Any) str[source]

Ensure logging directory exists.

dae.task_graph.logging.safe_task_id(task_id: str) str[source]

Module contents