queenbee.io.inputs.step module

Input types for Queenbee job steps.

For more information on plugins see plugin module.

class queenbee.io.inputs.step.StepArrayInput(*, type: ~typing.Literal['StepArrayInput'] = 'StepArrayInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: ~typing.List | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, items_type: ~queenbee.io.common.ItemType = ItemType.String, value: ~typing.List)[source]

Bases: FunctionArrayInput

A JSON array input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepArrayInput']
value: List
class queenbee.io.inputs.step.StepBooleanInput(*, type: ~typing.Literal['StepBooleanInput'] = 'StepBooleanInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: bool | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, value: bool)[source]

Bases: FunctionBooleanInput

The boolean type matches only two special values: True and False.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepBooleanInput']
value: bool
class queenbee.io.inputs.step.StepFileInput(*, type: ~typing.Literal['StepFileInput'] = 'StepFileInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, path: str = None, extensions: ~typing.List[str] | None = None, source: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder)[source]

Bases: FunctionFileInput

A file input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

path: str
source: HTTP | S3 | ProjectFolder
type: Literal['StepFileInput']
class queenbee.io.inputs.step.StepFolderInput(*, type: ~typing.Literal['StepFolderInput'] = 'StepFolderInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, path: str = None, source: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder)[source]

Bases: FunctionFolderInput

A folder input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

path: str
source: HTTP | S3 | ProjectFolder
type: Literal['StepFolderInput']
class queenbee.io.inputs.step.StepIntegerInput(*, type: ~typing.Literal['StepIntegerInput'] = 'StepIntegerInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: int | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, value: int)[source]

Bases: FunctionIntegerInput

An integer input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepIntegerInput']
value: int
class queenbee.io.inputs.step.StepJSONObjectInput(*, type: ~typing.Literal['StepJSONObjectInput'] = 'StepJSONObjectInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: ~typing.Dict | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, value: ~typing.Dict)[source]

Bases: FunctionJSONObjectInput

A JSON object input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepJSONObjectInput']
value: Dict
class queenbee.io.inputs.step.StepNumberInput(*, type: ~typing.Literal['StepNumberInput'] = 'StepNumberInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: float | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, value: float)[source]

Bases: FunctionNumberInput

A number input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepNumberInput']
value: float
class queenbee.io.inputs.step.StepPathInput(*, type: ~typing.Literal['StepPathInput'] = 'StepPathInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, path: str = None, extensions: ~typing.List[str] | None = None, source: ~queenbee.io.artifact_source.HTTP | ~queenbee.io.artifact_source.S3 | ~queenbee.io.artifact_source.ProjectFolder)[source]

Bases: FunctionPathInput

A file or a folder input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

path: str
source: HTTP | S3 | ProjectFolder
type: Literal['StepPathInput']
class queenbee.io.inputs.step.StepStringInput(*, type: ~typing.Literal['StepStringInput'] = 'StepStringInput', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, name: str, description: str | None = None, default: str | None = None, alias: ~typing.List[~queenbee.io.inputs.alias.DAGStringInputAlias | ~queenbee.io.inputs.alias.DAGIntegerInputAlias | ~queenbee.io.inputs.alias.DAGNumberInputAlias | ~queenbee.io.inputs.alias.DAGBooleanInputAlias | ~queenbee.io.inputs.alias.DAGFolderInputAlias | ~queenbee.io.inputs.alias.DAGFileInputAlias | ~queenbee.io.inputs.alias.DAGPathInputAlias | ~queenbee.io.inputs.alias.DAGArrayInputAlias | ~queenbee.io.inputs.alias.DAGJSONObjectInputAlias | ~queenbee.io.inputs.alias.DAGLinkedInputAlias | ~queenbee.io.inputs.alias.DAGGenericInputAlias] = <factory>, required: bool = False, spec: ~typing.Dict | None = None, value: str)[source]

Bases: FunctionStringInput

A String input.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['StepStringInput']
value: str
queenbee.io.inputs.step.from_template(template: DAGStringInput | DAGIntegerInput | DAGNumberInput | DAGBooleanInput | DAGFolderInput | DAGFileInput | DAGPathInput | DAGArrayInput | DAGJSONObjectInput | DAGGenericInput | FunctionStringInput | FunctionIntegerInput | FunctionNumberInput | FunctionBooleanInput | FunctionFolderInput | FunctionFileInput | FunctionPathInput | FunctionArrayInput | FunctionJSONObjectInput, value: Any) StepStringInput | StepIntegerInput | StepNumberInput | StepBooleanInput | StepFolderInput | StepFileInput | StepPathInput | StepArrayInput | StepJSONObjectInput[source]

Generate a step input from a template input type and a value

Parameters:
  • {Union[DAGInputs (template) – template (DAG or Function)

  • a (FunctionInputs]} -- An input from) – template (DAG or Function)

  • in (value {Any} -- The input value calculated for this template) – the workflow step

Returns:

StepInputs – A Step Input object