Source code for dae.task_graph.demo_graphs_cli

import argparse
import sys
import time

from dae.task_graph import TaskGraphCli
from dae.task_graph.graph import TaskGraph
from dae.utils.verbosity_configuration import VerbosityConfiguration


def _build_graph_a(graph_params: list[str] | None) -> TaskGraph:
    task_graph = TaskGraph()

    num_of_parts, parts_sleep, summary_sleep = "10", "2", "2"

    if graph_params:
        if len(graph_params) != 3:
            raise ValueError(
                "The graph A needs three parameters: "
                "<number of parts>, <seconds for parts>, "
                "<seconds for summary>")
        num_of_parts, parts_sleep, summary_sleep = graph_params
    print(
        f"Bulding graph A with {num_of_parts} parts, "
        f"{parts_sleep} seconds for each parts, and "
        f"{summary_sleep} secoconds for the summary")

    def task_part() -> None:
        time.sleep(float(parts_sleep))

    def task_summary() -> None:
        time.sleep(float(summary_sleep))

    parts = [task_graph.create_task(
        f"part {p}", task_part, [], [])
        for p in range(int(num_of_parts))]
    task_graph.create_task("summary", task_summary, [], parts)
    return task_graph


def _build_graph_d(graph_params: list[str] | None) -> TaskGraph:
    task_graph = TaskGraph()

    num_of_clicks, num_of_parts, parts_sleep, summary_sleep = \
        "5", "10", "2", "2"

    if graph_params:
        if len(graph_params) != 4:
            raise ValueError(
                "The graph D needs three parameters: "
                "<number of parts>, <seconds for parts>, "
                "<seconds for summary>")
        num_of_clicks, num_of_parts, parts_sleep, summary_sleep = graph_params
    print(
        f"Bulding graph D with {num_of_clicks} clicks, {num_of_parts} parts, "
        f"{parts_sleep} seconds for each parts, and "
        f"{summary_sleep} secoconds for the summary")

    def task_part() -> None:
        time.sleep(float(parts_sleep))

    def task_summary() -> None:
        time.sleep(float(summary_sleep))

    for click in range(int(num_of_clicks)):
        parts = [task_graph.create_task(
            f"part {click}:{p}", task_part, [], [])
            for p in range(int(num_of_parts))]
        task_graph.create_task(f"summary {click}", task_summary, [], parts)
    return task_graph


def _build_graph_b(graph_params: list[str] | None) -> TaskGraph:
    task_graph = TaskGraph()

    num_of_parts, parts_sleep, summary_sleep = "2", "5", "10"

    if graph_params:
        if len(graph_params) != 3:
            raise ValueError(
                "The graph B needs three parameters: "
                "<number of parts>, <seconds for parts>, "
                "<seconds for summary>")
        num_of_parts, parts_sleep, summary_sleep = graph_params
    print(
        f"Bulding graph A with {num_of_parts} parts, "
        f"{parts_sleep} seconds for each parts, and "
        f"{summary_sleep} secoconds for the summary")

    def task_part() -> str:
        time.sleep(float(parts_sleep))
        return 1000 * "B"

    def task_summary(*args: str) -> str:
        time.sleep(float(summary_sleep))
        if len(args) <= 5:
            return "b".join(args)
        return "c".join(args[:5])

    parts = [task_graph.create_task(
        f"part {p}", task_part, [], [])
        for p in range(int(num_of_parts))]
    task_graph.create_task("summary", task_summary, parts, parts)
    return task_graph


def _build_graph_c(graph_params: list[str] | None) -> TaskGraph:
    task_graph = TaskGraph()

    num_of_parts, parts_sleep, summary_sleep = "2", "5", "10"

    if graph_params:
        if len(graph_params) != 3:
            raise ValueError(
                "The graph C needs three parameters: "
                "<number of parts>, <sleep for parts>, "
                "<sleep for summary>")
        num_of_parts, parts_sleep, summary_sleep = graph_params
    print(
        f"Bulding graph C with {num_of_parts} parts, "
        f"{parts_sleep} seconds sleep for each parts, and "
        f"{summary_sleep} secoconds sleep for the summary")

    def task_part(*_args: str) -> str:
        time.sleep(float(parts_sleep))
        return 1000 * "B"

    def task_summary(*args: str) -> str:
        time.sleep(float(summary_sleep))
        if len(args) <= 5:
            return "b".join(args)
        return "c".join(args[:5])

    parts = [
        task_graph.create_task(f"part {p}", task_part, [], [])
        for p in range(int(num_of_parts))
    ]
    summary = task_graph.create_task("summary", task_summary, parts, parts)
    parts2 = [
        task_graph.create_task(f"part2 {p}", task_part, [summary], [summary])
        for p in range(int(num_of_parts))
    ]
    task_graph.create_task("summary2", task_summary, parts2, parts2)
    return task_graph


[docs] def build_demo_graph(graph_type: str, graph_params: list[str] | None) -> TaskGraph: """Build a demo graph.""" if graph_type == "A": return _build_graph_a(graph_params) if graph_type == "B": return _build_graph_b(graph_params) if graph_type == "C": return _build_graph_c(graph_params) if graph_type == "D": return _build_graph_d(graph_params) raise ValueError(f"Unknown graph <{graph_type}>")
[docs] def main(argv: list[str] | None = None) -> None: """Entry point for the demo script.""" parser = argparse.ArgumentParser(description="test_basic") parser.add_argument("graph", type=str, help="Demo graph", default="A", nargs="?") VerbosityConfiguration.set_arguments(parser) parser.add_argument("--graph-params", "-gp", type=str, nargs="+") TaskGraphCli.add_arguments(parser) if argv is None: argv = sys.argv[1:] args = parser.parse_args(argv) VerbosityConfiguration.set(args) graph = build_demo_graph(args.graph, args.graph_params) TaskGraphCli.process_graph(graph, **vars(args))