DagsterDocs
Quick search

Versioning and Memoization (Experimental)#

You can find the code for this example on Github

This example is intended to describe how to use Dagster's versioning and memoization features. As of release 0.10.0, Dagster has functionality for tagging a solid or resource with the version argument of the decorator (ie @solid(version="a"), @resource(version="b")). This version argument is meant to be a representation of the state of the code contained by the wrapper.

Dagster can use versions to determine whether or not it is necessary to re-execute a particular step. Given versions of the code from each solid in a pipeline, the system can infer whether an upcoming execution of a step will differ from previous executions by tagging solid outputs with a version. This allows for the outputs of previous runs to be re-used. We call this process memoized execution.

The following example will walk through adding versions to a pipeline, and then performing memoized execution. Please note that versioning and memoization are experimental features, and are not yet intended for production use.

Overview#

Consider adding version information to the following pipeline.

from dagster import ModeDefinition, pipeline, repository
from dagster.core.storage.memoizable_io_manager import versioned_filesystem_io_manager
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from memoized_development.solids.dog import emit_dog
from memoized_development.solids.sentence import emit_sentence
from memoized_development.solids.tree import emit_tree


@pipeline(
    mode_defs=[
        ModeDefinition("memoized", resource_defs={"io_manager": versioned_filesystem_io_manager}),
    ],
    tags={MEMOIZED_RUN_TAG: "true"},
)
def my_pipeline():
    return emit_sentence(emit_dog(), emit_tree())

In order to enable memoization for this pipeline, we have tagged the pipeline with MEMOIZED_RUN_TAG: "true", which indicates to the system that we should compute version information for outputs, and only run steps that have not been memoized. We also use versioned_filesystem_io_manager as our io_manager, which is a MemoizableIOManager. It is required to use a MemoizableIOManager as the io_manager, so that versioning information can be retrieved from previous runs.

To enable memoization features on this pipeline, we will need to tag each solid (emit_sentence, emit_dog, and emit_tree) with a version.

Computing Versions#

In order to catch code changes individually from each solid, we will have each solid live in its own file, and have the version of each solid be a hash of the corresponding file.

Here is the code to hash a file in Python. Note that this code is not provided by Dagster, and is instead part of the example code:

import hashlib


def get_hash_for_file(path):
    h = hashlib.new("md5")
    with open(path, "rb") as f:
        data = f.read()
    h.update(data)
    digest = h.hexdigest()
    return digest

And here is the definition of each solid, using file-hashing to compute the version:

from dagster import solid
from memoized_development.solids.solid_utils import get_hash_for_file


@solid(version=get_hash_for_file(__file__), config_schema={"dog_breed": str})
def emit_dog(context):
    breed = context.solid_config["dog_breed"]
    return breed
from dagster import solid
from memoized_development.solids.solid_utils import get_hash_for_file


@solid(version=get_hash_for_file(__file__), config_schema={"tree_species": str})
def emit_tree(context):
    species = context.solid_config["tree_species"]
    return "tree", species
from dagster import InputDefinition, solid
from memoized_development.solids.solid_utils import get_hash_for_file


@solid(
    version=get_hash_for_file(__file__),
    input_defs=[InputDefinition("dog_breed"), InputDefinition("tree_species")],
)
def emit_sentence(_context, dog_breed, tree_species):
    return f"{dog_breed} is a breed of dog, while {tree_species} is a species of tree."

Notice that the emit_dog and emit_tree solids require config. In order to execute my_pipeline, we will provide the required config for these solids:

solids:
  emit_dog:
    config:
      dog_breed: poodle
  emit_tree:
    config:
      tree_species: douglas_fir
resources:
  io_manager:
    config:
      base_dir: "./results"

Before executing, we can use the list_versions CLI to check which outputs have been memoized in their current state, and which will need to be re-executed:

dagster pipeline list_versions \
  -m memoized_development.repo \
  -c ./config/california_config.yaml \
  --mode memoized

This results in the following output:

| Step Output          | Version                                  | Status of Output   |
|----------------------|------------------------------------------|--------------------|
| emit_dog.result      | 93815493adaeae4d55fdfa820891e0d40fb95f46 | to-be-recomputed   |
| emit_tree.result     | 94d4620ffec745ef895719204424c16406646b99 | to-be-recomputed   |
| emit_sentence.result | d367447b011cc2dcee083d5d9509477da746958b | to-be-recomputed   |

This matches expectation, as we have not run the pipeline at this point. Use the execute CLI to execute the pipeline. Note that memoized execution is not yet supported from Dagit.

dagster pipeline execute \
  -m memoized_development.repo \
  -c ./config/california_config.yaml \
  --mode memoized

Let's see what happens if we change the config which we use to execute:

solids:
  emit_dog:
    config:
      dog_breed: poodle
  emit_tree:
    config:
      tree_species: red_maple
resources:
  io_manager:
    config:
      base_dir: "./results"

Notice that only the config to the emit_tree solid has changed. Therefore, the state of the output of emit_tree has changed, and we would expect the version to change. Note however that since the emit_sentence solid uses the result of emit_tree as input, the version of the output of emit_sentence should also change. We can verify this by again using the list_versions CLI.

dagster pipeline execute \
  -m memoized_development.repo \
  -c ./config/maryland_config.yaml \
  --mode memoized

This results in the following output:

| Step Output          | Version                                  | Status of Output   |
|----------------------|------------------------------------------|--------------------|
| emit_dog.result      | 93815493adaeae4d55fdfa820891e0d40fb95f46 | stored             |
| emit_tree.result     | 94d4620ffec745ef895719204424c16406646b99 | to-be-recomputed   |
| emit_sentence.result | d367447b011cc2dcee083d5d9509477da746958b | to-be-recomputed   |

As we expect, the versions of outputs for emit_tree and emit_sentence have changed, indicating that if we run the pipeline again with the given config, these two outputs will be reconstructed. Note additionally that the result of the emit_dog solid is marked as stored, and its version has not changed, because the state of its inputs has remained the same. If we execute the pipeline again with the new config, we expect that the output of emit_dog will be retrieved from the previous run, and that only the emit_tree and emit_sentence solids will run.