queenbee.io.artifact_source module

Queenbee ArtifactSource class.

ArtifactSource is a configuration to a source system to acquire artifacts from.

class queenbee.io.artifact_source.HTTP(*, type: ~typing.Literal['HTTP'] = 'HTTP', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, url: str)[source]

Bases: _ArtifactSource

HTTP Source

A web HTTP to an FTP server or an API for example.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property referenced_values: Dict[str, List[str]]
type: Literal['HTTP']
url: str
class queenbee.io.artifact_source.ProjectFolder(*, type: ~typing.Literal['ProjectFolder'] = 'ProjectFolder', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, path: str = None)[source]

Bases: _ArtifactSource

Project Folder Source

This is the path to a folder where files and folders can be sourced. In the context of a desktop run Workflow this folder will correspond to a local folder. In the context of a workflow run on Pollination this folder will correspond to a Project scoped folder.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

path: str
property referenced_values: Dict[str, List[str]]

Get referenced variables if any.

Returns:

Dict[str, List[str]] – A dictionary where keys are attributes and values

are lists contain referenced value string.

type: Literal['ProjectFolder']
class queenbee.io.artifact_source.S3(*, type: ~typing.Literal['S3'] = 'S3', annotations: ~typing.Dict[str, ~typing.Any] | None = <factory>, key: str, endpoint: str, bucket: str, credentials_path: str = None)[source]

Bases: _ArtifactSource

S3 Source

An S3 bucket artifact Source.

bucket: str
credentials_path: str
endpoint: str
key: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property referenced_values: Dict[str, List[str]]
type: Literal['S3']