DagsterDocs
Quick search

Pipeline#

A pipeline is a set of solids with explicit data dependencies on each other, creating a directed acyclic graph, or DAG.

Relevant APIs#

NameDescription
@pipelineThe decorator used to define a pipeline.
PipelineDefinitionBase class for solids. You almost never want to use initialize this class directly. Instead, you should use the @pipeline which returns a PipelineDefinition
ModeDefinitionModes allow you to vary pipeline behavior between different deployment environments. For more info, see the Modes section

Overview#

Solids are linked together into pipelines by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that in Dagster, solids dependencies are expressed as data dependencies instead of just the order solids should execute in.

This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.

Dependencies are expressed in Pipelines using Dagster's function invocation DSL.


Defining a pipeline#

To define a pipeline, use the @pipeline decorator.

Within the decorated function body, we use function calls to indicate the dependency structure between the solids making up the pipeline.

In this example, the add_one solid depends on the return_one solid's output. Because this data dependency exists, the return_one solid executes after add_one runs successfully and emits the required output.

@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@pipeline
def one_plus_one_pipeline():
    add_one(return_one())

Aliases and Tags#

Solid aliases#

You can use the same solid definition multiple times in the same pipeline.

@pipeline
def multiple_usage_pipeline():
    add_one(add_one(return_one()))

To differentiate between the two invocations of add_one Dagster automatically aliases the solid names to be add_one and add_one_2.

You can also manually define the alias by using the .alias method on the solid invocation.

@pipeline
def alias_pipeline():
    add_one.alias("second_addition")(add_one(return_one()))

Solid Tags#

Similar to aliases, you can also define solid tags on a solid invocation.

@pipeline
def tag_pipeline():
    add_one.tag({"my_tag": "my_value"})(add_one(return_one()))

Pipeline configuration#

Pipeline Modes#

Pipeline definitions do not expose a configuration schema. Instead, they specify a set of ModeDefinitions that can be used with the pipeline. For more information on Modes, see the Modes section.

dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")


@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
    my_solid()

Pipeline Tags#

Pipelines can specify a set of tags that are also automatically set on the resulting pipeline runs.

@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
    my_solid()

This pipeline defines a my_tag tag. Any pipeline runs created using this pipeline will also have the same tag.

Examples#

There are many DAG structures can represent using pipelines. This section covers a few basic patterns you can use to build more complex pipelines.

Linear Dependencies#

The simplest pipeline structure is the linear pipeline. We return one output from the root solid, and pass along data through single inputs and outputs.

Linear Pipeline

from dagster import InputDefinition, pipeline, solid


@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@pipeline
def linear_pipeline():
    add_one(add_one(add_one(return_one())))

Multiple Inputs and Outputs#

Branching Pipeline

A single output can be passed to multiple inputs on downstream solids. In this example, the output from the first solid is passed to two different solids. The outputs of those solids are combined and passed to the final solid.

from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid


@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@solid(
    input_defs=[
        InputDefinition(name="a", dagster_type=int),
        InputDefinition(name="b", dagster_type=int),
    ],
    output_defs=[
        OutputDefinition(name="sum", dagster_type=int),
    ],
)
def adder(context, a, b):
    yield Output(a + b, output_name="sum")


@pipeline
def inputs_and_outputs_pipeline():
    value = return_one()
    a = add_one(value)
    b = add_one(value)
    adder(a, b)

Conditional Branching#

A solid only starts to execute once all of its inputs have been resolved. We can use this behavior to model conditional execution of solids.

In this example, the branching_solid outputs either the branch_1 result or branch_2 result. Since solid execution is skipped for solids that have unresolved inputs, only one of the downstream solids will execute.

import random

from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid


@solid(
    output_defs=[
        OutputDefinition(int, "branch_1", is_required=False),
        OutputDefinition(int, "branch_2", is_required=False),
    ]
)
def branching_solid(_):
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):
    pass


@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):
    pass


@pipeline
def branching_pipeline():
    branch_1, branch_2 = branching_solid()
    branch_1_solid(branch_1)
    branch_2_solid(branch_2)

Fixed Fan-in Pipeline#

If you have a fixed set of solids that all return the same output type, you can collect all the outputs into a list and pass them into a single downstream solid.

The downstream solid executes only if all of the outputs were successfully created by the upstream solids.

from dagster import InputDefinition, List, OutputDefinition, pipeline, solid


@solid(output_defs=[OutputDefinition(int)])
def return_one(_):
    return 1


@solid(input_defs=[InputDefinition("nums", List[int])], output_defs=[OutputDefinition(int)])
def sum_fan_in(_, nums):
    return sum(nums)


@pipeline
def fan_in_pipeline():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias("return_one_{}".format(i))())
    sum_fan_in(fan_outs)

In this example, we have 10 solids that all output the number 1. The sum_fan_in solid takes all of these outputs as a list and sums them.

Dynamic fan-out Experimental#

In most cases, the structure of a pipeline is pre-determined before execution. Dagster now has experimental support for creating pipelines where the final structure is not determined until run-time. This is useful for map/reduce pipeline structures, where you want to execute a set of solids for each result of another solid.

In this example we have a solid files_in_directory that defines a DynamicOutputDefinition. This dynamic output will cause the downstream dependencies to be cloned for each DynamicOutput that is yielded. The downstream copies can be identified by the mapping_key supplied to DynamicOutput.

import os

from dagster import Field, pipeline, solid
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from dagster.utils import file_relative_path


@solid(
    config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
    output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
    path = context.solid_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@solid
def process_file(_, path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@pipeline
def process_directory():
    files_in_directory().map(process_file)

Order-based Dependencies (Nothing dependencies)#

Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of a solid depends on the output of an upstream solid.

If you have a solid, say Solid A, that does not depend on any outputs of another solid, say Solid B, there theoretically shouldn't be a reason for Solid A to run after Solid B. In most cases, these two solids should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.

If you need to model an explicit ordering dependency, you can use the Nothing Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids.

from dagster import InputDefinition, Nothing, pipeline, solid


@solid
def create_table_1(_context) -> Nothing:
    get_database_connection().execute("create table_1 as select * from some_source_table")


@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2(_context):
    get_database_connection().execute("create table_2 as select * from table_1")


@pipeline
def nothing_dependency_pipeline():
    create_table_2(create_table_1())

In this example, create_table_1 returns an output of type Nothing, and create_table_2 has an input of type Nothing. This lets us connect them in the pipeline definition so that create_table_2 executes only after create_table_1 successfully executes.

Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the solids, we could pass table pointers. For example, create_table_1 could return a table_pointer output of type str with a value of table_1, and this table name can be used in create_table_2 to more accurately model the data dependency.

Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storages, check out IOManagers.

Patterns#

Constructing PipelineDefinitions#

You may run into a situation where you need to programmatically construct the dependency graph for a pipeline. In that case, you can directly define the PipelineDefinition object.

To construct a PipelineDefinition, you need to pass the constructor a pipeline name, a list of solid definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. The top-level keys of the dependency dictionary are the string names of solids. If you are using solid aliases be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition.

one_plus_one_pipeline_def = PipelineDefinition(
    name="one_plus_one_pipeline",
    solid_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)

Pipeline DSL#

Sometimes you may want to construct the dependencies of a pipeline definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.

For example, you can have a YAML like this:

pipeline:
  name: some_example
  description: blah blah blah
  solids:
    - def: add_one
      alias: A
    - def: add_one
      alias: B
      deps:
        num:
          solid: A
    - def: add_two
      alias: C
      deps:
        num:
          solid: A
    - def: subtract
      deps:
        left:
          solid: B
        right:
          solid: C

You can programatically generate a PipelineDefinition from this YAML:

@solid
def add_one(_, num: int) -> int:
    return num + 1


@solid
def add_two(_, num: int) -> int:
    return num + 2


@solid
def subtract(_, left: int, right: int) -> int:
    return left + right


def construct_pipeline_with_yaml(yaml_file, solid_defs):
    yaml_data = load_yaml_from_path(yaml_file)
    solid_def_dict = {s.name: s for s in solid_defs}

    deps = {}

    for solid_yaml_data in yaml_data["pipeline"]["solids"]:
        check.invariant(solid_yaml_data["def"] in solid_def_dict)
        def_name = solid_yaml_data["def"]
        alias = solid_yaml_data.get("alias", def_name)
        solid_deps_entry = {}
        for input_name, input_data in solid_yaml_data.get("deps", {}).items():
            solid_deps_entry[input_name] = DependencyDefinition(
                solid=input_data["solid"], output=input_data.get("output", "result")
            )
        deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry

    return PipelineDefinition(
        name=yaml_data["pipeline"]["name"],
        description=yaml_data["pipeline"].get("description"),
        solid_defs=solid_defs,
        dependencies=deps,
    )


def define_dep_dsl_pipeline():
    return construct_pipeline_with_yaml(
        file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
    )


@repository
def define_repository():
    return {"pipelines": {"some_example": define_dep_dsl_pipeline}}