DagsterDocs
Quick search

Asset Catalog#

"Asset" is Dagster's word for an entity, external to solids, that is mutated or created by solids. An asset might be a table in a database that solids append to, an ML model in a model store that solids overwrite, or even a slack channel that solids write messages to.

Relevant APIs#

NameDescription
AssetMaterializationDagster event indicating that a solid has materialized an asset.

Overview#

The Asset Catalog is an interface inside Dagit that centers on assets. Each entry in the catalog is an asset and includes:

  • Runs that mutated or created the asset.
  • Metadata logged by developers about the asset.

Developers place entries in the Asset Catalog by yielding AssetMaterialization events from inside their solids or output managers. The act of mutating or creating an asset is called a "materialization". AssetMaterialization events may include arbitrary metadata that describes the asset at the time of the materialization.

There are two general patterns for dealing with assets when using Dagster:

  • Materialize assets from inside the body of a solid.
  • Focus solid on pure business logic, and delegate the materialization of assets to IOManagers.

Yielding an AssetMaterialization#

To make the Asset Catalog aware that we materialized an asset in our solid, we can yield an AssetMaterialization event. This would involve changing the following solid:

@solid
def my_simple_solid(_):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    return remote_storage_path

into something like this:

@solid
def my_materialization_solid(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
    yield Output(remote_storage_path)

Note: Our materialization solid must now explicitly yield an Output event instead of relying on the implicit conversion of the return value into an Output event.

We should now see a materialization event in the event log when we execute this solid.

Asset Catalog in Dagit#

After emitting an AssetMaterialization event, we should now also see an entry in the Asset Catalog.

Note: The Asset Catalog in Dagit is only enabled for instances configured with an "asset-aware" event log storage, such as PostgresEventLogStorage and ConsolidatedSqliteEventLogStorage. Here is an example of an asset-aware instance configuration:

run_launcher:
  module: dagster.core.launcher
  class: DefaultRunLauncher

event_log_storage:
  module: dagster.core.storage.event_log
  class: ConsolidatedSqliteEventLogStorage
  config:
    base_dir: "/var/shared/dagster_home"

Please refer to the Dagster Instance section for more information about configuring your Dagster instance.

Once the Dagster instance is configured with an asset-aware event log storage, after you run a pipeline which generates an asset, you'll find the asset listed on the Assets tab in Dagit.

You can also find the details about it by clicking the asset key of it, my_dataset in this case.

Clicking the [View Asset Dashboard] on the materialization event from the log generated above can also bring you to the corresponding Asset details page.

Examples#

Recording Asset Materializations in Output Managers#

To record that an OutputManager or IOManager has mutated or created an asset, we can yield an AssetMaterialization event from its handle_output method.

class PandasCsvIOManager(IOManager):
    def load_input(self, context):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
        return pd.read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path), description="Persisted result to storage."
        )

Attaching Metadata to the Asset Materialization#

There are a variety of types of metadata that can be associated with a materialization event, all through the EventMetadataEntry class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the Asset Catalog.

Example with a solid:

@solid
def my_metadata_materialization_solid(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(
        asset_key="my_dataset",
        description="Persisted result to storage",
        metadata_entries=[
            EventMetadataEntry.text("Text-based metadata for this event", label="text_metadata"),
            EventMetadataEntry.fspath(remote_storage_path),
            EventMetadataEntry.url("http://mycoolsite.com/url_for_my_data", label="dashboard_url"),
            EventMetadataEntry.float(calculate_bytes(df), "size (bytes)"),
        ],
    )
    yield Output(remote_storage_path)

Example with an IOManager:

class PandasCsvIOManagerWithMetadata(IOManager):
    def load_input(self, context):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
        return pd.read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path),
            description="Persisted result to storage.",
            metadata_entries=[
                EventMetadataEntry.int(obj.shape[0], label="number of rows"),
                EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean"),
            ],
        )

Check our API docs for EventMetadataEntry for more details on they types of event metadata available.