"Asset" is Dagster's word for an entity, external to solids, that is mutated or created by solids. An asset might be a table in a database that solids append to, an ML model in a model store that solids overwrite, or even a slack channel that solids write messages to.
Name | Description |
---|---|
AssetMaterialization | Dagster event indicating that a solid has materialized an asset. |
The Asset Catalog is an interface inside Dagit that centers on assets. Each entry in the catalog is an asset and includes:
Developers place entries in the Asset Catalog by yielding AssetMaterialization
events from inside their solids or output managers. The act
of mutating or creating an asset is called a "materialization". AssetMaterialization
events may include arbitrary metadata that describes the asset at the time of the materialization.
There are two general patterns for dealing with assets when using Dagster:
To make the Asset Catalog aware that we materialized an asset in our solid, we can yield an AssetMaterialization
event.
This would involve changing the following solid:
@solid
def my_simple_solid(_):
df = read_df()
remote_storage_path = persist_to_storage(df)
return remote_storage_path
into something like this:
@solid
def my_materialization_solid(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
yield Output(remote_storage_path)
Note: Our materialization solid must now explicitly yield an Output
event instead of relying on the implicit conversion of the return value into an Output
event.
We should now see a materialization event in the event log when we execute this solid.
After emitting an AssetMaterialization event, we should now also see an entry in the Asset Catalog.
Note: The Asset Catalog in Dagit is only enabled for instances configured with an "asset-aware"
event log storage, such as PostgresEventLogStorage
and ConsolidatedSqliteEventLogStorage
. Here
is an example of an asset-aware instance configuration:
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
event_log_storage:
module: dagster.core.storage.event_log
class: ConsolidatedSqliteEventLogStorage
config:
base_dir: "/var/shared/dagster_home"
Please refer to the Dagster Instance section for more information about configuring your Dagster instance.
Once the Dagster instance is configured with an asset-aware event log storage, after you run a
pipeline which generates an asset, you'll find the asset listed on the Assets
tab in Dagit.
You can also find the details about it by clicking the asset key of it, my_dataset
in this case.
Clicking the [View Asset Dashboard] on the materialization event from the log generated above can also bring you to the corresponding Asset details page.
To record that an OutputManager or IOManager has mutated or created an asset, we can yield an AssetMaterialization
event from its handle_output
method.
class PandasCsvIOManager(IOManager):
def load_input(self, context):
file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
return pd.read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
obj.to_csv(file_path)
yield AssetMaterialization(
asset_key=AssetKey(file_path), description="Persisted result to storage."
)
There are a variety of types of metadata that can be associated with a materialization event, all
through the EventMetadataEntry
class. Each materialization
event optionally takes a list of metadata entries that are then displayed in the event log and the
Asset Catalog.
Example with a solid:
@solid
def my_metadata_materialization_solid(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata_entries=[
EventMetadataEntry.text("Text-based metadata for this event", label="text_metadata"),
EventMetadataEntry.fspath(remote_storage_path),
EventMetadataEntry.url("http://mycoolsite.com/url_for_my_data", label="dashboard_url"),
EventMetadataEntry.float(calculate_bytes(df), "size (bytes)"),
],
)
yield Output(remote_storage_path)
Example with an IOManager:
class PandasCsvIOManagerWithMetadata(IOManager):
def load_input(self, context):
file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
return pd.read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
obj.to_csv(file_path)
yield AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata_entries=[
EventMetadataEntry.int(obj.shape[0], label="number of rows"),
EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean"),
],
)
Check our API docs for EventMetadataEntry
for more details
on they types of event metadata available.