dagster.
repository
RepositoryDefinition[source]¶Create a repository from the decorated function.
The decorated function should take no arguments and its return value should one of:
List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition]]
.Use this form when you have no need to lazy load pipelines or other definitions. This is the typical use case.
A dict of the form:
{
'pipelines': Dict[str, Callable[[], PipelineDefinition]],
'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]],
'schedules': Dict[str, Callable[[], ScheduleDefinition]]
'sensors': Dict[str, Callable[[], SensorDefinition]]
}
This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly.
RepositoryData
. Return this object if you need fine-grainedcontrol over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory.
Example:
######################################################################
# A simple repository using the first form of the decorated function
######################################################################
@solid(config_schema={n: Field(Int)})
def return_n(context):
return context.solid_config['n']
@pipeline(name='simple_pipeline')
def simple_pipeline():
return_n()
simple_partition_set = PartitionSetDefinition(
name='simple_partition_set',
pipeline_name='simple_pipeline',
partition_fn=lambda: range(10),
run_config_fn_for_partition=(
lambda partition: {
'solids': {'return_n': {'config': {'n': partition}}}
}
),
)
simple_schedule = simple_partition_set.create_schedule_definition(
schedule_name='simple_daily_10_pm_schedule',
cron_schedule='0 22 * * *',
)
@repository
def simple_repository():
return [simple_pipeline, simple_partition_set, simple_schedule]
######################################################################
# A lazy-loaded repository
######################################################################
def make_expensive_pipeline():
@pipeline(name='expensive_pipeline')
def expensive_pipeline():
for i in range(10000):
return_n.alias('return_n_{i}'.format(i=i))()
return expensive_pipeline
expensive_partition_set = PartitionSetDefinition(
name='expensive_partition_set',
pipeline_name='expensive_pipeline',
partition_fn=lambda: range(10),
run_config_fn_for_partition=(
lambda partition: {
'solids': {
'return_n_{i}'.format(i=i): {'config': {'n': partition}}
for i in range(10000)
}
}
),
)
def make_expensive_schedule():
expensive_partition_set.create_schedule_definition(
schedule_name='expensive_schedule',
cron_schedule='0 22 * * *',
)
@repository
def lazy_loaded_repository():
return {
'pipelines': {'expensive_pipeline': make_expensive_pipeline},
'partition_sets': {
'expensive_partition_set': expensive_partition_set
},
'schedules': {'expensive_schedule: make_expensive_schedule}
}
######################################################################
# A complex repository that lazily construct pipelines from a directory
# of files in a bespoke YAML format
######################################################################
class ComplexRepositoryData(RepositoryData):
def __init__(self, yaml_directory):
self._yaml_directory = yaml_directory
def get_pipeline(self, pipeline_name):
return self._construct_pipeline_def_from_yaml_file(
self._yaml_file_for_pipeline_name(pipeline_name)
)
...
@repository
def complex_repository():
return ComplexRepositoryData('some_directory')
dagster.
RepositoryDefinition
(name, repository_data, description=None)[source]¶Define a repository that contains a collection of definitions.
Users should typically not create objects of this class directly. Instead, use the
@repository()
decorator.
get_all_pipelines
()[source]¶Return all pipelines in the repository as a list.
Note that this will construct any pipeline in the lazily evaluated pipeline_dict
that
has not yet been constructed.
All pipelines in the repository.
List[PipelineDefinition]
get_all_solid_defs
()[source]¶Get all the solid definitions in a repository.
All solid definitions in the repository.
List[SolidDefinition]
get_pipeline
(name)[source]¶Get a pipeline by name.
If this pipeline is present in the lazily evaluated pipeline_dict
passed to the
constructor, but has not yet been constructed, only this pipeline is constructed, and will
be cached for future calls.
name (str) – Name of the pipeline to retrieve.
The pipeline definition corresponding to the given name.
has_pipeline
(name)[source]¶Check if a pipeline with a given name is present in the repository.
name (str) – The name of the pipeline.
bool