import logging
from collections.abc import Iterator
from typing import Any
from dae.task_graph.base_executor import TaskGraphExecutorBase
from dae.task_graph.cache import NoTaskCache
from dae.task_graph.graph import Task, TaskGraph
NO_TASK_CACHE = NoTaskCache()
logger = logging.getLogger(__name__)
[docs]
class SequentialExecutor(TaskGraphExecutorBase):
"""A Task Graph Executor that executes task in sequential order."""
def _execute(self, graph: TaskGraph) -> Iterator[tuple[Task, Any]]:
finished_tasks = 0
initial_task_count = len(graph)
while not graph.empty():
ready_tasks = graph.extract_tasks(graph.ready_tasks())
for ftask in ready_tasks:
# handle tasks that use the output of other tasks
result = self._exec(ftask, self._params)
graph.process_completed_tasks([(ftask.task, result)])
finished_tasks += 1
logger.debug("clean up task %s", ftask)
logger.info(
"finished %s/%s", finished_tasks,
initial_task_count)
yield ftask.task, result
# all tasks have already executed. Let's clean the state.
assert len(graph) == 0
[docs]
def close(self) -> None:
"""Close the executor and release resources."""