queenbee.recipe.dag module¶
Queenbee DAG.
A DAG defines a single step in a Recipe. Each DAG is a collection of tasks/steps. Each step indicates what function template should be used and maps inputs and outputs for the specific task.
- class queenbee.recipe.dag.DAG(*, type: ~typing.Literal['DAG'] = 'DAG', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, inputs: ~typing.List[~queenbee.io.inputs.dag.DAGStringInput | ~queenbee.io.inputs.dag.DAGIntegerInput | ~queenbee.io.inputs.dag.DAGNumberInput | ~queenbee.io.inputs.dag.DAGBooleanInput | ~queenbee.io.inputs.dag.DAGFolderInput | ~queenbee.io.inputs.dag.DAGFileInput | ~queenbee.io.inputs.dag.DAGPathInput | ~queenbee.io.inputs.dag.DAGArrayInput | ~queenbee.io.inputs.dag.DAGJSONObjectInput | ~queenbee.io.inputs.dag.DAGGenericInput] = <factory>, outputs: ~typing.List[~queenbee.io.outputs.dag.DAGGenericOutput | ~queenbee.io.outputs.dag.DAGStringOutput | ~queenbee.io.outputs.dag.DAGIntegerOutput | ~queenbee.io.outputs.dag.DAGNumberOutput | ~queenbee.io.outputs.dag.DAGBooleanOutput | ~queenbee.io.outputs.dag.DAGFolderOutput | ~queenbee.io.outputs.dag.DAGFileOutput | ~queenbee.io.outputs.dag.DAGPathOutput | ~queenbee.io.outputs.dag.DAGArrayOutput | ~queenbee.io.outputs.dag.DAGJSONObjectOutput] = <factory>, name: str, fail_fast: bool = True, tasks: ~typing.List[~queenbee.recipe.task.DAGTask])[source]¶
Bases:
IOBaseA Directed Acyclic Graph containing a list of tasks.
- classmethod check_dependencies(v: List[DAGTask]) List[DAGTask][source]¶
Check that all task dependencies exist.
- classmethod check_unique_names(v: List[DAGTask]) List[DAGTask][source]¶
Check that all tasks have unique names.
- fail_fast: bool¶
- static find_task_return(tasks: List[DAGTask], reference: TaskReference | TaskFileReference | TaskFolderReference | TaskPathReference) TaskReturn | TaskPathReturn[source]¶
Find a task output within the DAG from a reference
- Parameters:
:param : TaskPathReference]} – A reference to a DAG Task output
- Raises:
ValueError – The task name cannot be found.
ValueError – The task return reference cannot be found.
- Returns:
Union[TaskReturn, TaskPathReturn] – A task output parameter or artifact
- get_task(name)[source]¶
Get task by name.
- Parameters:
task (name {str} -- The name of a)
- Raises:
ValueError – The task name does not exist
- Returns:
DAGTask – A DAG Task
- inputs: List[DAGStringInput | DAGIntegerInput | DAGNumberInput | DAGBooleanInput | DAGFolderInput | DAGFileInput | DAGPathInput | DAGArrayInput | DAGJSONObjectInput | DAGGenericInput]¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- outputs: List[DAGGenericOutput | DAGStringOutput | DAGIntegerOutput | DAGNumberOutput | DAGBooleanOutput | DAGFolderOutput | DAGFileOutput | DAGPathOutput | DAGArrayOutput | DAGJSONObjectOutput]¶
- property templates: Set[str]¶
A list of unique templates referred to in the DAG.
- Returns:
List[str] – A list of task name
- type: Literal['DAG']¶