Source code for queenbee.job.run

"""Queenbee run class.

A Run contains the status of an individual recipe being executed
"""
from enum import Enum
from pydantic import Field, constr
from typing import List, Dict

from ..io.inputs.step import StepInputs
from ..io.outputs.step import StepOutputs
from .status import BaseStatus


[docs]class StepStatusEnum(str, Enum): """Enumaration of allowable status strings""" # The step has been scheduled for execution scheduled = 'Scheduled' # The step is running running = 'Running' # The step has failed failed = 'Failed' # The step has finished succesfully succeeded = 'Succeeded' # The step has been skipped skipped = 'Skipped' # Could not determine the status of the step unknown = 'Unknown'
[docs]class StatusType(str, Enum): """Type enum for status type.""" Function = 'Function' DAG = 'DAG' Loop = 'Loop' Container = 'Container' Unknown = 'Unknown'
[docs]class StepStatus(BaseStatus): """The Status of a Job Step""" type: constr(regex='^StepStatus$') = 'StepStatus' id: str = Field( ..., description='The step unique ID' ) name: str = Field( ..., description='A human readable name for the step. Usually defined by the ' 'DAG task name but can be extended if the step is part of a loop for example. ' 'This name is unique within the boundary of the DAG/Job that generated it.' ) status: StepStatusEnum = Field( StepStatusEnum.unknown, description='The status of this step.' ) status_type: StatusType = Field( ..., description='The type of step this status is for. Can be "Function", "DAG" or ' '"Loop"' ) template_ref: str = Field( ..., description='The name of the template that spawned this step' ) command: str = Field( None, description='The command used to run this step. Only applies to Function steps.' ) inputs: List[StepInputs] = Field( ..., description='The inputs used by this step.' ) outputs: List[StepOutputs] = Field( ..., description='The outputs produced by this step.' ) boundary_id: str = Field( None, description='This indicates the step ID of the associated template root \ step in which this step belongs to. A DAG step will have the id of the \ parent DAG for example.' ) children_ids: List[str] = Field( ..., description='A list of child step IDs' ) outbound_steps: List[str] = Field( ..., description='A list of the last step to ran in the context of this ' 'step. In the case of a DAG or a job this will be the last step that has ' 'been executed. It will remain empty for functions.' )
[docs]class RunStatusEnum(str, Enum): """Enumaration of allowable status strings""" # The run has been created created = 'Created' # The run has been scheduled for execution scheduled = 'Scheduled' # The run is being executed running = 'Running' # The run is being post-processed (status and folder cleanup) post_processing = 'Post-Processing' # The run has failed failed = 'Failed' # The run has been cancelled by a user cancelled = 'Cancelled' # The run has completed succesfully succeeded = 'Succeeded' # Could not determine the status of the run unknown = 'Unknown'
[docs]class RunStatus(BaseStatus): """Job Status.""" api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True) type: constr(regex='^RunStatus$') = 'RunStatus' id: str = Field( ..., description='The ID of the individual run.' ) job_id: str = Field( ..., description='The ID of the job that generated this run.' ) entrypoint: str = Field( None, description='The ID of the first step in the run.' ) status: RunStatusEnum = Field( RunStatusEnum.unknown, description='The status of this run.' ) steps: Dict[str, StepStatus] = {} inputs: List[StepInputs] = Field( ..., description='The inputs used for this run.' ) outputs: List[StepOutputs] = Field( ..., description='The outputs produced by this run.' )