queenbee.recipe.task module

DAGTask module.

class queenbee.recipe.task.DAGTask(*, type: ~typing.Literal['DAGTask'] = 'DAGTask', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, template: str, needs: ~typing.List[str] | None = None, arguments: ~typing.List[~queenbee.io.inputs.task.TaskArgument | ~queenbee.io.inputs.task.TaskPathArgument] = <factory>, loop: ~queenbee.recipe.task.DAGTaskLoop | None = None, sub_folder: str | None = None, returns: ~typing.List[~queenbee.io.outputs.task.TaskReturn | ~queenbee.io.outputs.task.TaskPathReturn] = <factory>)[source]

Bases: BaseModel

A single task in a DAG flow.

argument_by_name(name: str) TaskArgument | TaskPathArgument[source]

Find a task argument by name.

argument_by_ref_source(source) List[TaskArgument | TaskPathArgument][source]

Retrieve a list of argument artifacts by their source type.

Parameters:
  • type (source {str} -- The source) – ‘dag’, ‘task’, ‘value’. dag refers to InputReference, InputFileReference, InputFolderReference and InputPathReference. ‘task’ refers to TaskReference, TaskFileReference, TaskFolderReference and TaskPathReference. ‘item’ refers to ItemReference. Finally ‘value’ refers to ValueReference, ValueFileReference and ValueFolder Reference.

  • of (one) – ‘dag’, ‘task’, ‘value’. dag refers to InputReference, InputFileReference, InputFolderReference and InputPathReference. ‘task’ refers to TaskReference, TaskFileReference, TaskFolderReference and TaskPathReference. ‘item’ refers to ItemReference. Finally ‘value’ refers to ValueReference, ValueFileReference and ValueFolder Reference.

Raises:

ValueError – The source type is not recognized

Returns:

List[TaskArguments] – A list of Argument Artifacts

arguments: List[TaskArgument | TaskPathArgument]
property artifact_arguments: List

Get artifact arguments. Artifacts are file, folder and path inputs.

Returns:

A list – A list of artifact arguments.

property artifact_returns: List

Get artifact returns. Artifacts are file, folder and path inputs.

Returns:

A list – A list of artifact returns.

static artifacts_by_ref_source(artifacts, source) List[TaskArgument | TaskPathArgument][source]

Retrieve a list of argument artifacts by their source type.

Parameters:
  • type (source {str} -- The source) – ‘dag’, ‘task’, ‘value’. dag refers to InputFileReference, InputFolderReference and InputPathReference. ‘task’ refers to TaskFileReference, TaskFolderReference and TaskPathReference. Finally ‘value’ refers to ValueFileReference and ValueFolder Reference.

  • of (one) – ‘dag’, ‘task’, ‘value’. dag refers to InputFileReference, InputFolderReference and InputPathReference. ‘task’ refers to TaskFileReference, TaskFolderReference and TaskPathReference. Finally ‘value’ refers to ValueFileReference and ValueFolder Reference.

Raises:

ValueError – The source type is not recognized

Returns:

List[TaskPathArguments] – A list of Argument Artifacts

classmethod check_duplicate_argument_name(v: List[TaskArgument | TaskPathArgument], info: ValidationInfo) List[TaskArgument | TaskPathArgument][source]
classmethod check_duplicate_return_name(v: List[TaskReturn | TaskPathReturn], info: ValidationInfo) List[TaskReturn | TaskPathReturn][source]
classmethod check_item_references(v: DAGTaskLoop, info: ValidationInfo) DAGTaskLoop[source]
classmethod check_loop_referenced_task(v: DAGTaskLoop, info: ValidationInfo) DAGTaskLoop[source]
classmethod check_referenced_values(v: List[TaskArgument | TaskPathArgument], info: ValidationInfo) List[TaskArgument | TaskPathArgument][source]
classmethod check_references(v: str, info: ValidationInfo) str[source]
check_template(template)[source]

A function to check the inputs and outputs of a DAG task.

The DAG tasks should match a certain template.

Parameters:
Raises:

AssertionError – The task input and output values do not match the task template.

property is_root: bool

A root task does not have any dependencies

Returns:

bool – True if the task has no dependencies

loop: DAGTaskLoop | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
needs: List[str] | None
property parameter_arguments: List

Get parameter arguments. Parameters are file, folder and path inputs.

Returns:

A list – A list of parameter arguments.

property parameter_returns: List

Get parameter returns. Artifacts are file, folder and path inputs.

Returns:

A list – A list of artifact returns.

static parameters_by_ref_source(parameters, source) List[TaskArgument | TaskPathArgument][source]

Retrieve an list of argument parameters by their source type.

Parameters:
  • type (source {str} -- The source) – ‘dag’, ‘task’, ‘item’, ‘value’

  • of (one) – ‘dag’, ‘task’, ‘item’, ‘value’

Raises:

ValueError – The source type is not recognized

Returns:

List[TaskArguments] – A list of Argument Parameters

return_by_name(name: str) TaskReturn | TaskPathReturn[source]

Find a task return by name.

returns: List[TaskReturn | TaskPathReturn]
sub_folder: str | None
template: str
type: Literal['DAGTask']
class queenbee.recipe.task.DAGTaskLoop(*, type: ~typing.Literal['DAGTaskLoop'] = 'DAGTaskLoop', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, from_: ~queenbee.io.reference.InputReference | ~queenbee.io.reference.TaskReference | ~queenbee.io.reference.ValueListReference = None)[source]

Bases: BaseModel

Loop configuration for the task.

This will run the template provided multiple times and in parallel relative to an input or task parameter which should be a list.

from_: InputReference | TaskReference | ValueListReference
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['DAGTaskLoop']