gain.task_graph package
Submodules
gain.task_graph.base_executor module
- class gain.task_graph.base_executor.TaskGraphExecutorBase(task_cache: TaskCache = {}, *, force: bool = False, **kwargs: Any)[source]
Bases:
TaskGraphExecutorExecutor that walks the graph in order that satisfies dependancies.
- execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only guarantee is that when a task is returned its execution is already finished.
- get_completed_tasks(graph: TaskGraph) Generator[tuple[Task, Any], None, None][source]
Return cached tasks and their results.
All tasks that depend on uncomputed tasks are invalidated and will not be returned, even if they have a cached result.
All the tasks that are returned will be preprocessed and removed by the graph internally, so that they are not executed again.
Will not do anything is the executor is in force mode.
gain.task_graph.cache module
- class gain.task_graph.cache.CacheRecord(type: CacheRecordType, result_or_error: Any = None)[source]
Bases:
objectEncapsulate information about a task in the cache.
- property error: Any
- invalidate() CacheRecord[source]
Return a new instance that needs to be recomputed.
- property result: Any
- result_or_error: Any = None
- type: CacheRecordType
- class gain.task_graph.cache.CacheRecordType(*values)[source]
Bases:
Enum- COMPUTED = 1
- ERROR = 2
- NEEDS_COMPUTE = 0
- class gain.task_graph.cache.FileTaskCache(cache_dir: str)[source]
Bases:
TaskCacheUse file modification timestamps to determine if a task needs to run.
- cache(task: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- get_record(task_desc: TaskDesc) CacheRecord[source]
Get the cache record for a task.
- class gain.task_graph.cache.NoTaskCache[source]
Bases:
dict[Any,Any],TaskCacheDon’t check any conditions and just run any task.
- cache(task: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- get_record(task_desc: TaskDesc) CacheRecord[source]
For task in the graph load and yield the cache record.
- class gain.task_graph.cache.TaskCache[source]
Bases:
objectStore the result of a task in a file and reuse it if possible.
- abstractmethod cache(task: Task, *, is_error: bool, result: Any) None[source]
Cache the result or exception of a task.
- static create(*, force: bool = False, task_progress_mode: bool = True, cache_dir: str | None = None) TaskCache[source]
Create the appropriate task cache.
- abstractmethod get_record(task_desc: TaskDesc) CacheRecord[source]
For task in the graph load and yield the cache record.
gain.task_graph.cli_tools module
- class gain.task_graph.cli_tools.TaskGraphCli[source]
Bases:
objectTakes care of creating a task graph executor and executing a graph.
- static add_arguments(parser: ArgumentParser, *, task_progress_mode: bool = True, default_task_status_dir: str | None = './.task-progress', use_commands: bool = True) None[source]
Add arguments needed to execute a task graph.
- static create_executor(task_cache: TaskCache | None = None, **kwargs: Any) TaskGraphExecutor[source]
Create a task graph executor according to the args specified.
- gain.task_graph.cli_tools.task_graph_all_done(task_graph: TaskGraph, task_cache: TaskCache) bool[source]
Check if the task graph is fully executed.
When all tasks are already computed, the function returns True. If there are tasks, that need to run, the function returns False.
- gain.task_graph.cli_tools.task_graph_run(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) bool[source]
Execute (runs) the task_graph with the given executor.
- gain.task_graph.cli_tools.task_graph_run_with_results(task_graph: TaskGraph, executor: TaskGraphExecutor | None = None, *, keep_going: bool = False) Generator[Any, None, None][source]
Run a task graph, yielding the results from each task.
gain.task_graph.dask_executor module
gain.task_graph.demo_graphs_cli module
- gain.task_graph.demo_graphs_cli.build_demo_graph(graph_type: str, graph_params: list[str] | None) TaskGraph[source]
Build a demo graph.
gain.task_graph.executor module
- class gain.task_graph.executor.TaskGraphExecutor[source]
Bases:
objectClass that executes a task graph.
- abstractmethod execute(graph: TaskGraph) Iterator[tuple[Task, Any]][source]
Start executing the graph.
Return an iterator that yields the task in the graph after they are executed.
This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed.
The only guarantee is that when a task is returned its execution is already finished.
gain.task_graph.graph module
- class gain.task_graph.graph.Task(task_id: str)[source]
Bases:
objectRepresent one node in a TaskGraph together with its dependencies.
- task_id: str
- class gain.task_graph.graph.TaskDesc(task: Task, func: Callable[[...], Any], args: list[Any], kwargs: dict[str, Any], deps: list[Task], input_files: list[str], output_files: list[str], intermediate_output_files: list[str])[source]
Bases:
objectRepresent an immutable full task description with all its properties.
- args: list[Any]
- func: Callable[[...], Any]
- input_files: list[str]
- intermediate_output_files: list[str]
- kwargs: dict[str, Any]
- output_files: list[str]
- class gain.task_graph.graph.TaskGraph[source]
Bases:
objectAn object representing a graph of tasks.
- create_task(task_id: str, func: Callable[[...], Any], *, args: Sequence[Any], kwargs: dict[str, Any] | None = None, deps: Sequence[Task] | None = None, input_files: Sequence[str] | None = None, output_files: Sequence[str] | None = None, intermediate_output_files: Sequence[str] | None = None) Task[source]
Create a new task and add it to the graph.
- Parameters:
name – Name of the task (used for debugging purposes)
func – Function to execute
args – Arguments to that function
deps – List of TaskNodes on which the current task depends
input_files – Files that were used to build the graph itself
output_files – Final output files; if missing the task recomputes
intermediate_output_files – Pipeline-consumed outputs; if missing falls through to the flag-file check instead of forcing recompute
- Return Task:
The newly created task node ID in the graph
- extract_tasks(selected_tasks: Sequence[Task]) Sequence[TaskDesc][source]
Collects tasks from the task graph and and removes them.
- get_task_deps(task: Task) list[Task][source]
Get dependancies of a task suitable for dask executor.
- static make_task(task_id: str, func: Callable[[...], Any], *, args: Sequence[Any], kwargs: dict[str, Any] | None = None, deps: Sequence[Task] | None = None, input_files: Sequence[str] | None = None, output_files: Sequence[str] | None = None, intermediate_output_files: Sequence[str] | None = None) TaskDesc[source]
Build a task with the given id and function.
- process_completed_tasks(task_result: Sequence[tuple[Task, Any]]) None[source]
Process a completed task.
- Parameters:
task – Completed task
gain.task_graph.logging module
- class gain.task_graph.logging.FsspecHandler(logfile: str)[source]
Bases:
StreamHandler[Any]Class to create fsspec based logging handler.
gain.task_graph.process_pool_executor module
- class gain.task_graph.process_pool_executor.ProcessPoolTaskExecutor(**kwargs: Any)[source]
Bases:
TaskGraphExecutorBaseExecute tasks in parallel using Dask to do the heavy lifting.