queenbee.job.run module

Queenbee run class.

A Run contains the status of an individual recipe being executed

class queenbee.job.run.RunStatus(*, type: ~typing.Literal['RunStatus'] = 'RunStatus', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, inputs: ~typing.List[~queenbee.io.inputs.step.StepStringInput | ~queenbee.io.inputs.step.StepIntegerInput | ~queenbee.io.inputs.step.StepNumberInput | ~queenbee.io.inputs.step.StepBooleanInput | ~queenbee.io.inputs.step.StepFolderInput | ~queenbee.io.inputs.step.StepFileInput | ~queenbee.io.inputs.step.StepPathInput | ~queenbee.io.inputs.step.StepArrayInput | ~queenbee.io.inputs.step.StepJSONObjectInput], outputs: ~typing.List[~queenbee.io.outputs.step.StepStringOutput | ~queenbee.io.outputs.step.StepIntegerOutput | ~queenbee.io.outputs.step.StepNumberOutput | ~queenbee.io.outputs.step.StepBooleanOutput | ~queenbee.io.outputs.step.StepFolderOutput | ~queenbee.io.outputs.step.StepFileOutput | ~queenbee.io.outputs.step.StepPathOutput | ~queenbee.io.outputs.step.StepArrayOutput | ~queenbee.io.outputs.step.StepJSONObjectOutput], message: str | None = None, started_at: ~datetime.datetime, finished_at: ~datetime.datetime | None = None, source: str | None = None, api_version: ~typing.Literal['v1beta1'] = 'v1beta1', id: str, job_id: str, entrypoint: str | None = None, status: ~queenbee.job.run.RunStatusEnum = RunStatusEnum.unknown, steps: ~typing.Dict[str, ~queenbee.job.run.StepStatus] = {})[source]

Bases: BaseStatus

Job Status.

api_version: Literal['v1beta1']
entrypoint: str | None
id: str
inputs: List[StepStringInput | StepIntegerInput | StepNumberInput | StepBooleanInput | StepFolderInput | StepFileInput | StepPathInput | StepArrayInput | StepJSONObjectInput]
job_id: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

outputs: List[StepStringOutput | StepIntegerOutput | StepNumberOutput | StepBooleanOutput | StepFolderOutput | StepFileOutput | StepPathOutput | StepArrayOutput | StepJSONObjectOutput]
status: RunStatusEnum
steps: Dict[str, StepStatus]
type: Literal['RunStatus']
class queenbee.job.run.RunStatusEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumaration of allowable status strings

cancelled = 'Cancelled'
created = 'Created'
failed = 'Failed'
post_processing = 'Post-Processing'
running = 'Running'
scheduled = 'Scheduled'
succeeded = 'Succeeded'
unknown = 'Unknown'
class queenbee.job.run.StatusType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Type enum for status type.

Container = 'Container'
DAG = 'DAG'
Function = 'Function'
Loop = 'Loop'
Unknown = 'Unknown'
class queenbee.job.run.StepStatus(*, type: ~typing.Literal['StepStatus'] = 'StepStatus', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, inputs: ~typing.List[~queenbee.io.inputs.step.StepStringInput | ~queenbee.io.inputs.step.StepIntegerInput | ~queenbee.io.inputs.step.StepNumberInput | ~queenbee.io.inputs.step.StepBooleanInput | ~queenbee.io.inputs.step.StepFolderInput | ~queenbee.io.inputs.step.StepFileInput | ~queenbee.io.inputs.step.StepPathInput | ~queenbee.io.inputs.step.StepArrayInput | ~queenbee.io.inputs.step.StepJSONObjectInput], outputs: ~typing.List[~queenbee.io.outputs.step.StepStringOutput | ~queenbee.io.outputs.step.StepIntegerOutput | ~queenbee.io.outputs.step.StepNumberOutput | ~queenbee.io.outputs.step.StepBooleanOutput | ~queenbee.io.outputs.step.StepFolderOutput | ~queenbee.io.outputs.step.StepFileOutput | ~queenbee.io.outputs.step.StepPathOutput | ~queenbee.io.outputs.step.StepArrayOutput | ~queenbee.io.outputs.step.StepJSONObjectOutput], message: str | None = None, started_at: ~datetime.datetime, finished_at: ~datetime.datetime | None = None, source: str | None = None, id: str, name: str, status: ~queenbee.job.run.StepStatusEnum = StepStatusEnum.unknown, status_type: ~queenbee.job.run.StatusType, template_ref: str, command: str | None = None, boundary_id: str | None = None, children_ids: ~typing.List[str], outbound_steps: ~typing.List[str])[source]

Bases: BaseStatus

The Status of a Job Step

boundary_id: str | None
children_ids: List[str]
command: str | None
id: str
inputs: List[StepStringInput | StepIntegerInput | StepNumberInput | StepBooleanInput | StepFolderInput | StepFileInput | StepPathInput | StepArrayInput | StepJSONObjectInput]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
outbound_steps: List[str]
outputs: List[StepStringOutput | StepIntegerOutput | StepNumberOutput | StepBooleanOutput | StepFolderOutput | StepFileOutput | StepPathOutput | StepArrayOutput | StepJSONObjectOutput]
status: StepStatusEnum
status_type: StatusType
template_ref: str
type: Literal['StepStatus']
class queenbee.job.run.StepStatusEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumaration of allowable status strings

failed = 'Failed'
running = 'Running'
scheduled = 'Scheduled'
skipped = 'Skipped'
succeeded = 'Succeeded'
unknown = 'Unknown'