Source code for dae.task_graph.executor

from __future__ import annotations

import logging
from abc import abstractmethod
from collections.abc import Iterator
from types import TracebackType
from typing import Any

from dae.task_graph.graph import Task, TaskGraph

logger = logging.getLogger(__name__)


[docs] class TaskGraphExecutor: """Class that executes a task graph."""
[docs] @abstractmethod def execute(self, graph: TaskGraph) -> Iterator[tuple[Task, Any]]: """Start executing the graph. Return an iterator that yields the task in the graph after they are executed. This is not necessarily in DFS or BFS order. This is not even the order in which these tasks are executed. The only guarantee is that when a task is returned its execution is already finished. """
def __enter__(self) -> TaskGraphExecutor: return self def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None, ) -> bool: self.close() return exc_type is None
[docs] @abstractmethod def close(self) -> None: """Clean-up any resources used by the executor."""