Source code for queenbee.io.outputs.step

"""Output types for a Queenbee Job steps.

For more information on plugins see plugin module.
"""

import json
from typing import Union, List, Dict, Any
from pydantic import constr, Field

from .function import FunctionStringOutput, FunctionIntegerOutput, \
    FunctionNumberOutput, FunctionBooleanOutput, FunctionFolderOutput, \
    FunctionFileOutput, FunctionPathOutput, FunctionArrayOutput, \
    FunctionJSONObjectOutput, FunctionOutputs

from .dag import DAGStringOutput, DAGIntegerOutput, DAGNumberOutput, \
    DAGBooleanOutput, DAGFolderOutput, DAGFileOutput, DAGPathOutput, \
    DAGArrayOutput, DAGJSONObjectOutput, DAGOutputs

from ..artifact_source import HTTP, S3, ProjectFolder


[docs]class StepStringOutput(FunctionStringOutput): """A String output.""" type: constr(regex='^StepStringOutput$') = 'StepStringOutput' value: str
[docs]class StepIntegerOutput(FunctionIntegerOutput): """An integer output.""" type: constr(regex='^StepIntegerOutput$') = 'StepIntegerOutput' value: int
[docs]class StepNumberOutput(FunctionNumberOutput): """A number output.""" type: constr(regex='^StepNumberOutput$') = 'StepNumberOutput' value: float
[docs]class StepBooleanOutput(FunctionBooleanOutput): """The boolean type matches only two special values: True and False.""" type: constr(regex='^StepBooleanOutput$') = 'StepBooleanOutput' value: bool
[docs]class StepFolderOutput(FunctionFolderOutput): """A folder output.""" type: constr(regex='^StepFolderOutput$') = 'StepFolderOutput' source: Union[HTTP, S3, ProjectFolder] = Field( ..., description='The path to source the file from.' )
[docs]class StepFileOutput(FunctionFileOutput): """A file output.""" type: constr(regex='^StepFileOutput$') = 'StepFileOutput' source: Union[HTTP, S3, ProjectFolder] = Field( ..., description='The path to source the file from.' )
[docs]class StepPathOutput(FunctionPathOutput): """A file or a folder output.""" type: constr(regex='^StepPathOutput$') = 'StepPathOutput' source: Union[HTTP, S3, ProjectFolder] = Field( ..., description='The path to source the file from.' )
[docs]class StepArrayOutput(FunctionArrayOutput): """A JSON array output.""" type: constr(regex='^StepArrayOutput$') = 'StepArrayOutput' value: List
[docs]class StepJSONObjectOutput(FunctionJSONObjectOutput): """A JSON object output.""" type: constr(regex='^StepJSONObjectOutput$') = 'StepJSONObjectOutput' value: Dict
StepOutputs = Union[ StepStringOutput, StepIntegerOutput, StepNumberOutput, StepBooleanOutput, StepFolderOutput, StepFileOutput, StepPathOutput, StepArrayOutput, StepJSONObjectOutput ]
[docs]def from_template(template: Union[DAGOutputs, FunctionOutputs], value: Any) -> StepOutputs: """Generate a step output from a template output type and a value Args: template {Union[DAGOutputs, FunctionOutputs]} -- An output from a template (DAG or Function) value {Any} -- The output value calculated for this template in the workflow step Returns: StepOutputs -- A Step Output object """ template_dict = template.to_dict() del template_dict['type'] if template.is_artifact: template_dict['source'] = value if 'path' not in template_dict: # path is required for a step but is missing from a DAG output template_dict['path'] = '' elif template.is_parameter: template_dict['value'] = value if template.__class__ in [DAGStringOutput, FunctionStringOutput]: return StepStringOutput.parse_obj(template_dict) if template.__class__ in [DAGIntegerOutput, FunctionIntegerOutput]: template_dict['value'] = int(float(template_dict['value'])) return StepIntegerOutput.parse_obj(template_dict) if template.__class__ in [DAGNumberOutput, FunctionNumberOutput]: return StepNumberOutput.parse_obj(template_dict) if template.__class__ in [DAGBooleanOutput, FunctionBooleanOutput]: return StepBooleanOutput.parse_obj(template_dict) if template.__class__ in [DAGFolderOutput, FunctionFolderOutput]: return StepFolderOutput.parse_obj(template_dict) if template.__class__ in [DAGFileOutput, FunctionFileOutput]: return StepFileOutput.parse_obj(template_dict) if template.__class__ in [DAGPathOutput, FunctionPathOutput]: return StepPathOutput.parse_obj(template_dict) if template.__class__ in [DAGArrayOutput, FunctionArrayOutput]: if isinstance(template_dict['value'], str): template_dict['value'] = json.loads(template_dict['value']) return StepArrayOutput.parse_obj(template_dict) if template.__class__ in [DAGJSONObjectOutput, FunctionJSONObjectOutput]: if isinstance(template_dict['value'], str): template_dict['value'] = json.loads(template_dict['value']) try: # Try to parse JSON as a dict return StepJSONObjectOutput.parse_obj(template_dict) except: # Try to parse JSON as an array return StepArrayOutput.parse_obj(template_dict)