A pipeline is a set of solids with explicit data dependencies on each other, creating a directed acyclic graph, or DAG.
Name | Description |
---|---|
@pipeline | The decorator used to define a pipeline. |
PipelineDefinition | Base class for solids. You almost never want to use initialize this class directly. Instead, you should use the @pipeline which returns a PipelineDefinition |
ModeDefinition | Modes allow you to vary pipeline behavior between different deployment environments. For more info, see the Modes section |
Solids are linked together into pipelines by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that in Dagster, solids dependencies are expressed as data dependencies instead of just the order solids should execute in.
This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.
Dependencies are expressed in Pipelines using Dagster's function invocation DSL.
To define a pipeline, use the @pipeline
decorator.
Within the decorated function body, we use function calls to indicate the dependency structure between the solids making up the pipeline.
In this example, the add_one
solid depends on the return_one
solid's output. Because this data dependency exists, the return_one
solid executes after add_one
runs successfully and emits the required output.
@solid
def return_one(context):
return 1
@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
return number + 1
@pipeline
def one_plus_one_pipeline():
add_one(return_one())
You can use the same solid definition multiple times in the same pipeline.
@pipeline
def multiple_usage_pipeline():
add_one(add_one(return_one()))
To differentiate between the two invocations of add_one
Dagster automatically aliases the solid names to be add_one
and add_one_2
.
You can also manually define the alias by using the .alias
method on the solid invocation.
@pipeline
def alias_pipeline():
add_one.alias("second_addition")(add_one(return_one()))
Similar to aliases, you can also define solid tags on a solid invocation.
@pipeline
def tag_pipeline():
add_one.tag({"my_tag": "my_value"})(add_one(return_one()))
Pipeline definitions do not expose a configuration schema. Instead, they specify a set of ModeDefinitions
that can be used with the pipeline. For more information on Modes, see the Modes section.
dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")
@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
my_solid()
Pipelines can specify a set of tags that are also automatically set on the resulting pipeline runs.
@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
my_solid()
This pipeline defines a my_tag
tag. Any pipeline runs created using this pipeline will also have the same tag.
There are many DAG structures can represent using pipelines. This section covers a few basic patterns you can use to build more complex pipelines.
The simplest pipeline structure is the linear pipeline. We return one output from the root solid, and pass along data through single inputs and outputs.
from dagster import InputDefinition, pipeline, solid
@solid
def return_one(context):
return 1
@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
return number + 1
@pipeline
def linear_pipeline():
add_one(add_one(add_one(return_one())))
A single output can be passed to multiple inputs on downstream solids. In this example, the output from the first solid is passed to two different solids. The outputs of those solids are combined and passed to the final solid.
from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid
@solid
def return_one(context):
return 1
@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
return number + 1
@solid(
input_defs=[
InputDefinition(name="a", dagster_type=int),
InputDefinition(name="b", dagster_type=int),
],
output_defs=[
OutputDefinition(name="sum", dagster_type=int),
],
)
def adder(context, a, b):
yield Output(a + b, output_name="sum")
@pipeline
def inputs_and_outputs_pipeline():
value = return_one()
a = add_one(value)
b = add_one(value)
adder(a, b)
A solid only starts to execute once all of its inputs have been resolved. We can use this behavior to model conditional execution of solids.
In this example, the branching_solid
outputs either the branch_1
result or branch_2
result. Since solid execution is skipped for solids that have unresolved inputs, only one of the downstream solids will execute.
import random
from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid
@solid(
output_defs=[
OutputDefinition(int, "branch_1", is_required=False),
OutputDefinition(int, "branch_2", is_required=False),
]
)
def branching_solid(_):
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):
pass
@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):
pass
@pipeline
def branching_pipeline():
branch_1, branch_2 = branching_solid()
branch_1_solid(branch_1)
branch_2_solid(branch_2)
If you have a fixed set of solids that all return the same output type, you can collect all the outputs into a list and pass them into a single downstream solid.
The downstream solid executes only if all of the outputs were successfully created by the upstream solids.
from dagster import InputDefinition, List, OutputDefinition, pipeline, solid
@solid(output_defs=[OutputDefinition(int)])
def return_one(_):
return 1
@solid(input_defs=[InputDefinition("nums", List[int])], output_defs=[OutputDefinition(int)])
def sum_fan_in(_, nums):
return sum(nums)
@pipeline
def fan_in_pipeline():
fan_outs = []
for i in range(0, 10):
fan_outs.append(return_one.alias("return_one_{}".format(i))())
sum_fan_in(fan_outs)
In this example, we have 10 solids that all output the number 1
. The sum_fan_in
solid takes all of these outputs as a list and sums them.
In most cases, the structure of a pipeline is pre-determined before execution. Dagster now has experimental support for creating pipelines where the final structure is not determined until run-time. This is useful for map/reduce pipeline structures, where you want to execute a set of solids for each result of another solid.
In this example we have a solid files_in_directory
that defines a DynamicOutputDefinition
. This dynamic output will cause the downstream dependencies to be cloned for each DynamicOutput
that is yielded. The downstream copies can be identified by the mapping_key
supplied to DynamicOutput
.
import os
from dagster import Field, pipeline, solid
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from dagster.utils import file_relative_path
@solid(
config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
path = context.solid_config["path"]
dirname, _, filenames = next(os.walk(path))
for file in filenames:
yield DynamicOutput(
value=os.path.join(dirname, file),
# create a mapping key from the file name
mapping_key=file.replace(".", "_").replace("-", "_"),
)
@solid
def process_file(_, path: str) -> int:
# simple example of calculating size
return os.path.getsize(path)
@pipeline
def process_directory():
files_in_directory().map(process_file)
Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of a solid depends on the output of an upstream solid.
If you have a solid, say Solid A
, that does not depend on any outputs of another solid, say Solid B
, there theoretically shouldn't be a reason for Solid A
to run after Solid B
. In most cases, these two solids should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.
If you need to model an explicit ordering dependency, you can use the Nothing
Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids.
from dagster import InputDefinition, Nothing, pipeline, solid
@solid
def create_table_1(_context) -> Nothing:
get_database_connection().execute("create table_1 as select * from some_source_table")
@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2(_context):
get_database_connection().execute("create table_2 as select * from table_1")
@pipeline
def nothing_dependency_pipeline():
create_table_2(create_table_1())
In this example, create_table_1
returns an output of type Nothing
, and create_table_2
has an input of type Nothing
. This lets us connect them in the pipeline definition so that create_table_2
executes only after create_table_1
successfully executes.
Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the solids, we could pass table pointers. For example, create_table_1
could return a table_pointer
output of type str
with a value of table_1
, and this table name can be used in create_table_2
to more accurately model the data dependency.
Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storages, check out IOManagers.
You may run into a situation where you need to programmatically construct the dependency graph for a pipeline. In that case, you can directly define the PipelineDefinition
object.
To construct a PipelineDefinition, you need to pass the constructor a pipeline name, a list of solid definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. The top-level keys of the dependency dictionary are the string names of solids. If you are using solid aliases be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition
.
one_plus_one_pipeline_def = PipelineDefinition(
name="one_plus_one_pipeline",
solid_defs=[return_one, add_one],
dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)
Sometimes you may want to construct the dependencies of a pipeline definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.
For example, you can have a YAML like this:
pipeline:
name: some_example
description: blah blah blah
solids:
- def: add_one
alias: A
- def: add_one
alias: B
deps:
num:
solid: A
- def: add_two
alias: C
deps:
num:
solid: A
- def: subtract
deps:
left:
solid: B
right:
solid: C
You can programatically generate a PipelineDefinition from this YAML:
@solid
def add_one(_, num: int) -> int:
return num + 1
@solid
def add_two(_, num: int) -> int:
return num + 2
@solid
def subtract(_, left: int, right: int) -> int:
return left + right
def construct_pipeline_with_yaml(yaml_file, solid_defs):
yaml_data = load_yaml_from_path(yaml_file)
solid_def_dict = {s.name: s for s in solid_defs}
deps = {}
for solid_yaml_data in yaml_data["pipeline"]["solids"]:
check.invariant(solid_yaml_data["def"] in solid_def_dict)
def_name = solid_yaml_data["def"]
alias = solid_yaml_data.get("alias", def_name)
solid_deps_entry = {}
for input_name, input_data in solid_yaml_data.get("deps", {}).items():
solid_deps_entry[input_name] = DependencyDefinition(
solid=input_data["solid"], output=input_data.get("output", "result")
)
deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry
return PipelineDefinition(
name=yaml_data["pipeline"]["name"],
description=yaml_data["pipeline"].get("description"),
solid_defs=solid_defs,
dependencies=deps,
)
def define_dep_dsl_pipeline():
return construct_pipeline_with_yaml(
file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
)
@repository
def define_repository():
return {"pipelines": {"some_example": define_dep_dsl_pipeline}}