Dagster includes a built-in scheduler that can be used to launch runs on a fixed interval.
Name | Description |
---|---|
@daily_schedule | Decorator that defines a schedule that executes every day at a fixed time |
@hourly_schedule | Decorator that defines a schedule that executes every hour at a fixed time |
@weekly_schedule | Decorator that defines a schedule that executes every week on a fixed day and time |
@monthly_schedule | Decorator that defines a schedule that executes every month on a fixed day and time |
@schedule | Decorator that defines a schedule that executes according to a given cron schedule. It's important to note that schedules created using this schedule are non-partition based schedules |
ScheduleExecutionContext | The context passed to the schedule definition execution function |
ScheduleDefinition | Base class for schedules. You almost never want to use initialize this class directly. Instead, you should use one of the many decorators above |
A schedule is a definition in Dagster that targets a pipeline and is used to execute the pipeline on a fixed schedule. The definition itself is responsible for generating run configuration for the pipeline on each schedule tick. In order to run schedules, you will need to run the dagster-daemon process.
Schedules have several important properties:
should_execute
function, which can be used to skip a schedule execution based on an abitrary condition.Dagster also includes a built in scheduler, the DagsterDaemonScheduler
, which runs as a part of the dagster-daemon process. Once you have defined a schedule, see the dagster-daemon page for instructions on how to run the daemon in order to execute your schedules.
There are two tpes of schedules in Dagster:
(TODO): Small intro about how to decide whichs schedule to use
Partition-based schedules are schedules that generate an underlying PartitionSetDefinition
to execute against. The scheduelr will make sure that a run will be executed for every partition in the partition set.
For example, if we wanted to create a schedule that runs at 11:00 AM every day, we could use the @daily_schedule
, which is a decorator that produced a partition-based daily schedule.
@daily_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2021, 1, 1),
execution_time=time(11, 0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
Partition-based schedules are useful when it's important for a successful run to exist for every scheduled run time. For example, if each run fills in a daily partition of a data warehouse table, basing your schedule on a partition set allows you to track which days have corresponding runs and fill in gaps when runs fail.
For use-cases where you want to run the schedule on a fixed cron interval and don't need partition based configuration, you can use the \<PyObject object="schedule" decorator"/> decorator to define your schedule.
In this example, the schedule runs at 1:00 AM in US/Central every day
@schedule(
cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
) # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
return {"solids": {"dataset_name": "my_dataset"}}
You can also access the execution time fron the context:
@schedule(
cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
) # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"process_data_for_date": {"config": {"date": date}}}}`
If you have a unique scheduling requirement, you can define your own partition set and schedule definition using the base classes.
TODO
If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator.
In this example, we directly use the solid_selection
, mode
, tags
, and run_config
from the preset:
@daily_schedule(
start_date=...,
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_daily_schedule(_date):
return preset.run_config
You might need to modify the preset's run config to include information about the date partition the schedule is running for. You can use the merge_dicts
util to inject the date:
@daily_schedule(
start_date=...,
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_daily_schedule(_date):
return merge_dicts(preset.run_config, {"date": date.strftime("%Y-%m-%d")}}
If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule.
from dagster import check, PipelineDefinition, DagsterInvalidDefinitionError, daily_schedule
import datetime
def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
check.inst_param(pipeline, "pipeline", PipelineDefinition)
check.str_param(preset_name, "preset_name")
check.inst_param(start_date, "start_date", datetime.datetime)
preset = pipeline.get_preset(preset_name)
if not preset:
raise DagsterInvalidDefinitionError(
"Preset {preset_name} was not found "
"on pipeline {pipeline_name}".format(
preset_name=preset_name, pipeline_name=pipeline.name
)
)
@daily_schedule(
start_date=start_date,
pipeline_name=pipeline.name,
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_daily_schedule(_date):
return preset.run_config
return my_daily_schedule
Try these steps if you're trying to run a schedule and are running into problems. These steps assume that you are using the default scheduler on your instance, which executes your schedules in the dagster-daemon service. See the migration guide for instructions on how to migrate to this scheduler.
The left-hand navigation bar in Dagit shows all of the schedules for the currently-selected repository, with a green dot next to each schedule that is running. Make sure that your schedule appears in this list with a green dot. To ensure that Dagit has loaded the latest version of your schedule code, you can press the reload button next to your repository name to reload all the code in your repository.
When you click on your schedule name in the left-hand nav in Dagit, you'll be take to a page where you can view more information about the schedule. If the schedule is running, there should be a "Next tick" row near the top of the page that tells you when the schedule is expected to run next. Make sure that time is what you expect (including the timezone).
It's possible that the dagster-daemon
process that submits runs for your schedule is not working correctly. If you haven't set up dagster-daemon
yet, check the Deploying Dagster section to find the steps to do so.
First, check that the daemon is running. Click on "Status" in the left nav in Dagit, and examine the "Scheduler" row under the "Daemon statuses" section. The daemon process periodically sends out a heartbeat from the scheduler, so if the scheduler daemon is listed as "Not running", that indicates that there's a problem with your daemon deployment. If the daemon ran into an error that caused it to throw an exception, that error will often appear in this UI as well.
If there isn't a clear error on this page, or if the daemon should be sending heartbeats but isn't, you may need to check the logs from the daemon process. The steps to do this will depend on your deployment - for example, if you're using Kubernetes, you'll need to get the logs from the pod that's running the daemon. You should be able to search those logs for the name of your schedule (or SchedulerDaemon
to see all logs associated with the scheduler) to gain an understanding of what's going wrong.
Finally, it's possible that the daemon is running correctly, but there's a problem with your schedule code. Check the "Latest tick" row on the page for your schedule. If there was an error while trying to submit runs for your schedule, there should be a red "Failure" box next to the time. Clicking on the box should display an error with a stack trace showing you why the schedule couldn't execute. If the schedule is working as expected, it should display a blue box instead with information about any runs that were created by that schedule tick.
If these steps didn't help and your schedule still isn't running, reach out in Slack or file an issue and we'll be happy to help investigate.