queenbee.job.job module

class queenbee.job.job.Job(*, type: ~typing.Literal['Job'] = 'Job', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, api_version: ~typing.Literal['v1beta1'] = 'v1beta1', source: str, arguments: ~typing.List[~typing.List[~queenbee.io.inputs.job.JobArgument | ~queenbee.io.inputs.job.JobPathArgument]] | None = None, name: str | None = None, description: str | None = None, labels: ~typing.Dict[str, str] | None = None)[source]

Bases: BaseModel

Queenbee Job.

A Job is an object to submit a list of arguments to execute a Queenbee recipe.

api_version: Literal['v1beta1']
arguments: List[List[JobArgument | JobPathArgument]] | None
classmethod check_duplicate_names(v: List[List[JobArgument | JobPathArgument]])[source]
description: str | None
labels: Dict[str, str] | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | None
populate_default_arguments(inputs: List[DAGStringInput | DAGIntegerInput | DAGNumberInput | DAGBooleanInput | DAGFolderInput | DAGFileInput | DAGPathInput | DAGArrayInput | DAGJSONObjectInput | DAGGenericInput])[source]
source: str
type: Literal['Job']
validate_arguments(inputs: List[DAGStringInput | DAGIntegerInput | DAGNumberInput | DAGBooleanInput | DAGFolderInput | DAGFileInput | DAGPathInput | DAGArrayInput | DAGJSONObjectInput | DAGGenericInput])[source]
class queenbee.job.job.JobStatus(*, type: ~typing.Literal['JobStatus'] = 'JobStatus', annotations: ~typing.Dict[str, ~typing.Any] = <factory>, api_version: ~typing.Literal['v1beta1'] = 'v1beta1', id: str, status: ~queenbee.job.job.JobStatusEnum = JobStatusEnum.unknown, message: str | None = None, started_at: ~datetime.datetime, finished_at: ~datetime.datetime | None = None, source: str | None = None, runs_pending: int = 0, runs_running: int = 0, runs_completed: int = 0, runs_failed: int = 0, runs_cancelled: int = 0)[source]

Bases: BaseModel

Parametric Job Status.

api_version: Literal['v1beta1']
finished_at: datetime | None
id: str
message: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

runs_cancelled: int
runs_completed: int
runs_failed: int
runs_pending: int
runs_running: int
source: str | None
started_at: datetime
status: JobStatusEnum
type: Literal['JobStatus']
class queenbee.job.job.JobStatusEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumaration of allowable status strings

cancelled = 'Cancelled'
completed = 'Completed'
created = 'Created'
failed = 'Failed'
pre_processing = 'Pre-Processing'
running = 'Running'
unknown = 'Unknown'