gain.task_graph package

Submodules

gain.task_graph.base_executor module

class gain.task_graph.base_executor.TaskGraphExecutorBase(task_cache: TaskCache = {}, *, force: bool = False, **kwargs: Any)[source]

Bases: TaskGraphExecutor

Executor that walks the graph in order that satisfies dependancies.

execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only guarantee is that when a task is returned its execution is already finished.

get_completed_tasks(graph: TaskGraph) Generator[tuple[Task, Any], None, None][source]

Return cached tasks and their results.

All tasks that depend on uncomputed tasks are invalidated and will not be returned, even if they have a cached result.

All the tasks that are returned will be preprocessed and removed by the graph internally, so that they are not executed again.

Will not do anything is the executor is in force mode.

gain.task_graph.cache module

class gain.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]

Bases: object

Encapsulate information about a task in the cache.

property error: Any
invalidate() CacheRecord[source]

Return a new instance that needs to be recomputed.

property result: Any
result_or_error: Any = None
type: CacheRecordType
class gain.task_graph.cache.CacheRecordType(*values)[source]

Bases: Enum

COMPUTED = 1
ERROR = 2
NEEDS_COMPUTE = 0
class gain.task_graph.cache.FileTaskCache(cache_dir: str)[source]

Bases: TaskCache

Use file modification timestamps to determine if a task needs to run.

cache(task: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

get_record(task_desc: TaskDesc) CacheRecord[source]

Get the cache record for a task.

class gain.task_graph.cache.NoTaskCache[source]

Bases: dict[Any, Any], TaskCache

Don’t check any conditions and just run any task.

cache(task: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

get_record(task_desc: TaskDesc) CacheRecord[source]

For task in the graph load and yield the cache record.

class gain.task_graph.cache.TaskCache[source]

Bases: object

Store the result of a task in a file and reuse it if possible.

abstractmethod cache(task: Task, *, is_error: bool, result: Any) None[source]

Cache the result or exception of a task.

static create(*, force: bool = False, task_progress_mode: bool = True, cache_dir: str | None = None) TaskCache[source]

Create the appropriate task cache.

abstractmethod get_record(task_desc: TaskDesc) CacheRecord[source]

For task in the graph load and yield the cache record.

gain.task_graph.cli_tools module

class gain.task_graph.cli_tools.TaskGraphCli[source]

Bases: object

Takes care of creating a task graph executor and executing a graph.

static add_arguments(parser: ArgumentParser, *, task_progress_mode: bool = True, default_task_status_dir: str | None = './.task-progress', use_commands: bool = True) None[source]

Add arguments needed to execute a task graph.

static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor[source]

Create a task graph executor according to the args specified.

static process_graph(task_graph: TaskGraph, *, task_progress_mode: bool = True, **kwargs: Any) bool[source]

Process task_graph in according with the arguments in args.

Return true if the graph get’s successfully processed.

gain.task_graph.cli_tools.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool[source]

Check if the task graph is fully executed.

When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.

gain.task_graph.cli_tools.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool[source]

Execute (runs) the task_graph with the given executor.

gain.task_graph.cli_tools.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) Generator[Any, None, None][source]

Run a task graph, yielding the results from each task.

gain.task_graph.cli_tools.task_graph_status(task_graph: TaskGraph, task_cache: TaskCache, verbose: int | None) bool[source]

Show the status of each task from the task graph.

gain.task_graph.dask_executor module

class gain.task_graph.dask_executor.DaskExecutor(dask_client: Client, task_cache: TaskCache = {}, **kwargs: Any)[source]

Bases: TaskGraphExecutorBase

Dask-based task graph executor.

MAX_RUNNING_TASKS = 700
close() None[source]

Close the Dask executor.

gain.task_graph.demo_graphs_cli module

gain.task_graph.demo_graphs_cli.build_demo_graph(graph_type: str, graph_params: list[str] | None) TaskGraph[source]

Build a demo graph.

gain.task_graph.demo_graphs_cli.main(argv: list[str] | None = None) None[source]

Entry point for the demo script.

gain.task_graph.demo_graphs_cli.task_part(seconds: str) str[source]
gain.task_graph.demo_graphs_cli.task_part_b(seconds: str) str[source]
gain.task_graph.demo_graphs_cli.task_part_c(seconds: str, *_args: str) str[source]
gain.task_graph.demo_graphs_cli.task_summary(seconds: str) None[source]
gain.task_graph.demo_graphs_cli.task_summary_b(seconds: str, *args: str) str[source]
gain.task_graph.demo_graphs_cli.task_summary_c(seconds: str, *args: str) str[source]
gain.task_graph.demo_graphs_cli.timeout(seconds: str) float[source]

gain.task_graph.executor module

class gain.task_graph.executor.TaskGraphExecutor[source]

Bases: object

Class that executes a task graph.

abstractmethod close() None[source]

Clean-up any resources used by the executor.

abstractmethod execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]

Start executing the graph.

Return an iterator that yields the task in the graph after they are executed.

This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.

The only guarantee is that when a task is returned its execution is already finished.

abstractmethod get_completed_tasks(graph: TaskGraph) Generator[tuple[Task, Any], None, None][source]

Return an iterator that yields already completed tasks in the graph.

This is not necessarily in DFS or BFS order.

gain.task_graph.graph module

class gain.task_graph.graph.Task(task_id: str)[source]

Bases: object

Represent one node in a TaskGraph together with its dependencies.

task_id: str
class gain.task_graph.graph.TaskDesc(task: Task, func: Callable[[...], Any], args: list[Any], kwargs: dict[str, Any], deps: list[Task], input_files: list[str], output_files: list[str], intermediate_output_files: list[str])[source]

Bases: object

Represent an immutable full task description with all its properties.

args: list[Any]
deps: list[Task]
func: Callable[[...], Any]
input_files: list[str]
intermediate_output_files: list[str]
kwargs: dict[str, Any]
output_files: list[str]
task: Task
class gain.task_graph.graph.TaskGraph[source]

Bases: object

An object representing a graph of tasks.

add_task(task_desc: TaskDesc) Task[source]

Add a task to the graph.

add_tasks(task_descs: Sequence[TaskDesc]) list[Task][source]

Add multiple tasks to the graph.

as_directed_graph() DiGraph[source]

Return the task graph as a networkx directed graph.

create_task(task_id: str, func: Callable[[...], Any], *, args: Sequence[Any], kwargs: dict[str, Any] | None = None, deps: Sequence[Task] | None = None, input_files: Sequence[str] | None = None, output_files: Sequence[str] | None = None, intermediate_output_files: Sequence[str] | None = None) Task[source]

Create a new task and add it to the graph.

Parameters:
  • name – Name of the task (used for debugging purposes)

  • func – Function to execute

  • args – Arguments to that function

  • deps – List of TaskNodes on which the current task depends

  • input_files – Files that were used to build the graph itself

  • output_files – Final output files; if missing the task recomputes

  • intermediate_output_files – Pipeline-consumed outputs; if missing falls through to the flag-file check instead of forcing recompute

Return Task:

The newly created task node ID in the graph

empty() bool[source]

Check if the graph is empty.

extract_tasks(selected_tasks: Sequence[Task]) Sequence[TaskDesc][source]

Collects tasks from the task graph and and removes them.

get_task_deps(task: Task) list[Task][source]

Get dependancies of a task suitable for dask executor.

get_task_desc(task: Task) TaskDesc[source]

Get full task description for a given task.

has_task(task: Task) bool[source]

Check if the graph contains a task.

static make_task(task_id: str, func: Callable[[...], Any], *, args: Sequence[Any], kwargs: dict[str, Any] | None = None, deps: Sequence[Task] | None = None, input_files: Sequence[str] | None = None, output_files: Sequence[str] | None = None, intermediate_output_files: Sequence[str] | None = None) TaskDesc[source]

Build a task with the given id and function.

process_completed_tasks(task_result: Sequence[tuple[Task, Any]]) None[source]

Process a completed task.

Parameters:

task – Completed task

prune(tasks_to_keep: Sequence[Task | str]) None[source]

Prune to keep the specified tasks and their dependencies.

ready_tasks(limit: int = 0) Sequence[Task][source]

Return tasks which have no dependencies.

property tasks: Sequence[Task]

Return all tasks in the graph.

topological_order() Sequence[Task][source]

Return tasks in topological order.

gain.task_graph.graph.chain_tasks(*tasks: TaskDesc) TaskDesc[source]

Chain tasks together so that they execute sequentially.

gain.task_graph.graph.sync_tasks() None[source]

gain.task_graph.logging module

class gain.task_graph.logging.FsspecHandler(logfile: str)[source]

Bases: StreamHandler[Any]

Class to create fsspec based logging handler.

close() None[source]

Close the stream.

Copied from logging.FileHandler.close().

gain.task_graph.logging.configure_task_logging(log_dir: str | None, task_id: str, verbosity: int) Handler[source]

Configure and return task logging hadnler.

gain.task_graph.logging.ensure_log_dir(**kwargs: Any) str[source]

Ensure logging directory exists.

gain.task_graph.logging.safe_task_id(task_id: str) str[source]

gain.task_graph.process_pool_executor module

class gain.task_graph.process_pool_executor.ProcessPoolTaskExecutor(**kwargs: Any)[source]

Bases: TaskGraphExecutorBase

Execute tasks in parallel using Dask to do the heavy lifting.

close() None[source]

Clean-up any resources used by the executor.

gain.task_graph.sequential_executor module

class gain.task_graph.sequential_executor.SequentialExecutor(task_cache: TaskCache = {}, *, force: bool = False, **kwargs: Any)[source]

Bases: TaskGraphExecutorBase

A Task Graph Executor that executes task in sequential order.

close() None[source]

Close the executor and release resources.

Module contents