queenbee.recipe.dag module

Queenbee DAG.

A DAG defines a single step in a Recipe. Each DAG is a collection of tasks/steps. Each step indicates what function template should be used and maps inputs and outputs for the specific task.

class queenbee.recipe.dag.DAG(*, type: ~typing.Literal['DAG'] = 'DAG', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, inputs: ~typing.List[~queenbee.io.inputs.dag.DAGStringInput | ~queenbee.io.inputs.dag.DAGIntegerInput | ~queenbee.io.inputs.dag.DAGNumberInput | ~queenbee.io.inputs.dag.DAGBooleanInput | ~queenbee.io.inputs.dag.DAGFolderInput | ~queenbee.io.inputs.dag.DAGFileInput | ~queenbee.io.inputs.dag.DAGPathInput | ~queenbee.io.inputs.dag.DAGArrayInput | ~queenbee.io.inputs.dag.DAGJSONObjectInput | ~queenbee.io.inputs.dag.DAGGenericInput] = <factory>, outputs: ~typing.List[~queenbee.io.outputs.dag.DAGGenericOutput | ~queenbee.io.outputs.dag.DAGStringOutput | ~queenbee.io.outputs.dag.DAGIntegerOutput | ~queenbee.io.outputs.dag.DAGNumberOutput | ~queenbee.io.outputs.dag.DAGBooleanOutput | ~queenbee.io.outputs.dag.DAGFolderOutput | ~queenbee.io.outputs.dag.DAGFileOutput | ~queenbee.io.outputs.dag.DAGPathOutput | ~queenbee.io.outputs.dag.DAGArrayOutput | ~queenbee.io.outputs.dag.DAGJSONObjectOutput] = <factory>, name: str, fail_fast: bool = True, tasks: ~typing.List[~queenbee.recipe.task.DAGTask])[source]

Bases: IOBase

A Directed Acyclic Graph containing a list of tasks.

check_dag_references() DAG[source]

Check references within the DAG.

classmethod check_dependencies(v: List[DAGTask]) List[DAGTask][source]

Check that all task dependencies exist.

classmethod check_unique_names(v: List[DAGTask]) List[DAGTask][source]

Check that all tasks have unique names.

fail_fast: bool
static find_task_return(tasks: List[DAGTask], reference: TaskReference | TaskFileReference | TaskFolderReference | TaskPathReference) TaskReturn | TaskPathReturn[source]

Find a task output within the DAG from a reference

Parameters:
  • Tasks (tasks {List[DAGTask]} -- A list of DAG)

  • {Union[TaskReference (reference) – TaskPathReference]} – A reference to a DAG Task output

  • TaskFileReference – TaskPathReference]} – A reference to a DAG Task output

  • TaskFolderReference – TaskPathReference]} – A reference to a DAG Task output

:param : TaskPathReference]} – A reference to a DAG Task output

Raises:
  • ValueError – The task name cannot be found.

  • ValueError – The task return reference cannot be found.

Returns:

Union[TaskReturn, TaskPathReturn] – A task output parameter or artifact

get_task(name)[source]

Get task by name.

Parameters:

task (name {str} -- The name of a)

Raises:

ValueError – The task name does not exist

Returns:

DAGTask – A DAG Task

inputs: List[DAGStringInput | DAGIntegerInput | DAGNumberInput | DAGBooleanInput | DAGFolderInput | DAGFileInput | DAGPathInput | DAGArrayInput | DAGJSONObjectInput | DAGGenericInput]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
outputs: List[DAGGenericOutput | DAGStringOutput | DAGIntegerOutput | DAGNumberOutput | DAGBooleanOutput | DAGFolderOutput | DAGFileOutput | DAGPathOutput | DAGArrayOutput | DAGJSONObjectOutput]
classmethod sort_list(v: list) list[source]

Sort the list of tasks, inputs and outputs by name

tasks: List[DAGTask]
property templates: Set[str]

A list of unique templates referred to in the DAG.

Returns:

List[str] – A list of task name

type: Literal['DAG']