from typing import Any, Dict, Set
from dagster import PipelineDefinition, PipelineRun, SolidDefinition, check
from dagster.core.definitions.dependency import Solid
from dagster.core.execution.context.compute import AbstractComputeExecutionContext
from dagster.core.execution.context.system import SystemPipelineExecutionContext
from dagster.core.log_manager import DagsterLogManager
from dagster.core.system_config.objects import EnvironmentConfig
[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext):
"""Dagstermill-specific execution context.
Do not initialize directly: use :func:`dagstermill.get_context`.
"""
def __init__(
self,
pipeline_context: SystemPipelineExecutionContext,
resource_keys_to_init: Set[str],
solid_name: str,
solid_config: Any = None,
):
self._pipeline_context = check.inst_param(
pipeline_context, "pipeline_context", SystemPipelineExecutionContext
)
self._resource_keys_to_init = check.set_param(
resource_keys_to_init, "resource_keys_to_init", of_type=str
)
self.solid_name = check.str_param(solid_name, "solid_name")
self._solid_config = solid_config
[docs] def has_tag(self, key: str) -> bool:
"""Check if a logging tag is defined on the context.
Args:
key (str): The key to check.
Returns:
bool
"""
check.str_param(key, "key")
return self._pipeline_context.has_tag(key)
[docs] def get_tag(self, key: str) -> str:
"""Get a logging tag defined on the context.
Args:
key (str): The key to get.
Returns:
str
"""
check.str_param(key, "key")
return self._pipeline_context.get_tag(key)
@property
def run_id(self) -> str:
"""str: The run_id for the context."""
return self._pipeline_context.run_id
@property
def run_config(self) -> Dict[str, Any]:
"""dict: The run_config for the context."""
return self._pipeline_context.run_config
@property
def environment_config(self) -> EnvironmentConfig:
""":class:`dagster.EnvironmentConfig`: The environment_config for the context"""
return self._pipeline_context.environment_config
@property
def logging_tags(self) -> Dict[str, str]:
"""dict: The logging tags for the context."""
return self._pipeline_context.logging_tags
@property
def pipeline_name(self) -> str:
return self._pipeline_context.pipeline_name
@property
def pipeline_def(self) -> PipelineDefinition:
""":class:`dagster.PipelineDefinition`: The pipeline definition for the context.
This will be a dagstermill-specific shim.
"""
return self._pipeline_context.pipeline.get_definition()
@property
def resources(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
resources."""
return self._pipeline_context.scoped_resources_builder.build(
required_resource_keys=self._resource_keys_to_init,
)
@property
def pipeline_run(self) -> PipelineRun:
""":class:`dagster.PipelineRun`: The pipeline run for the context."""
return self._pipeline_context.pipeline_run
@property
def log(self) -> DagsterLogManager:
""":class:`dagster.DagsterLogManager`: The log manager for the context.
Call, e.g., ``log.info()`` to log messages through the Dagster machinery.
"""
return self._pipeline_context.log
@property
def solid_def(self) -> SolidDefinition:
""":class:`dagster.SolidDefinition`: The solid definition for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether a
solid definition was passed to ``dagstermill.get_context``.
"""
return self.pipeline_def.solid_def_named(self.solid_name)
@property
def solid(self) -> Solid:
""":class:`dagster.Solid`: The solid for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether a
solid definition was passed to ``dagstermill.get_context``.
"""
return self.pipeline_def.solid_named(self.solid_name)
@property
def solid_config(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
solid-specific config."""
if self._solid_config:
return self._solid_config
solid_config = self.environment_config.solids.get(self.solid_name)
return solid_config.config if solid_config else None
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext):
pass