DagsterDocs
Quick search

Schedules#

Dagster includes a built-in scheduler that can be used to launch runs on a fixed interval.

Relevant APIs#

NameDescription
@daily_scheduleDecorator that defines a schedule that executes every day at a fixed time
@hourly_scheduleDecorator that defines a schedule that executes every hour at a fixed time
@weekly_scheduleDecorator that defines a schedule that executes every week on a fixed day and time
@monthly_scheduleDecorator that defines a schedule that executes every month on a fixed day and time
@scheduleDecorator that defines a schedule that executes according to a given cron schedule. It's important to note that schedules created using this schedule are non-partition based schedules
ScheduleExecutionContextThe context passed to the schedule definition execution function
ScheduleDefinitionBase class for schedules. You almost never want to use initialize this class directly. Instead, you should use one of the many decorators above

Overview#

A schedule is a definition in Dagster that targets a pipeline and is used to execute the pipeline on a fixed schedule. The definition itself is responsible for generating run configuration for the pipeline on each schedule tick. In order to run schedules, you will need to run the dagster-daemon process.

Schedules have several important properties:

  • Each schedule targets a specific pipeline
  • A schedule defines a function that returns run configuration for the targeted pipeline.
  • A schedule optionally defines tags, a mode, and a solid selection for the targeted pipeline.
  • A schedule optionally defines a should_execute function, which can be used to skip a schedule execution based on an abitrary condition.

Dagster also includes a built in scheduler, the DagsterDaemonScheduler, which runs as a part of the dagster-daemon process. Once you have defined a schedule, see the dagster-daemon page for instructions on how to run the daemon in order to execute your schedules.


Defining a schedule#

There are two tpes of schedules in Dagster:

  • Partition based
  • Non-partition based

(TODO): Small intro about how to decide whichs schedule to use

Partition-based schedules#

Partition-based schedules are schedules that generate an underlying PartitionSetDefinition to execute against. The scheduelr will make sure that a run will be executed for every partition in the partition set.

For example, if we wanted to create a schedule that runs at 11:00 AM every day, we could use the @daily_schedule, which is a decorator that produced a partition-based daily schedule.

@daily_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2021, 1, 1),
    execution_time=time(11, 0),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Partition-based schedules are useful when it's important for a successful run to exist for every scheduled run time. For example, if each run fills in a daily partition of a data warehouse table, basing your schedule on a partition set allows you to track which days have corresponding runs and fill in gaps when runs fail.

Non-partition-based schedules#

Running the scheduler#

Examples#

Cron schedule#

For use-cases where you want to run the schedule on a fixed cron interval and don't need partition based configuration, you can use the \<PyObject object="schedule" decorator"/> decorator to define your schedule.

In this example, the schedule runs at 1:00 AM in US/Central every day

@schedule(
    cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
)  # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
    return {"solids": {"dataset_name": "my_dataset"}}

You can also access the execution time fron the context:

@schedule(
    cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
)  # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"solids": {"process_data_for_date": {"config": {"date": date}}}}`

Hourly Schedule#

Daily Schedule#

Weekly Schedule#

Monthly Schedule#

Schedule from Custom Partition Set#

If you have a unique scheduling requirement, you can define your own partition set and schedule definition using the base classes.

TODO

Patterns#

Using a preset in a schedule definition#

If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator.

In this example, we directly use the solid_selection, mode, tags, and run_config from the preset:

@daily_schedule(
    start_date=...,
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_daily_schedule(_date):
    return preset.run_config

You might need to modify the preset's run config to include information about the date partition the schedule is running for. You can use the merge_dicts util to inject the date:

@daily_schedule(
    start_date=...,
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_daily_schedule(_date):
    return merge_dicts(preset.run_config, {"date": date.strftime("%Y-%m-%d")}}

If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule.

from dagster import check, PipelineDefinition, DagsterInvalidDefinitionError, daily_schedule
import datetime


def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
    check.inst_param(pipeline, "pipeline", PipelineDefinition)
    check.str_param(preset_name, "preset_name")
    check.inst_param(start_date, "start_date", datetime.datetime)

    preset = pipeline.get_preset(preset_name)
    if not preset:
        raise DagsterInvalidDefinitionError(
            "Preset {preset_name} was not found "
            "on pipeline {pipeline_name}".format(
                preset_name=preset_name, pipeline_name=pipeline.name
            )
        )

    @daily_schedule(
        start_date=start_date,
        pipeline_name=pipeline.name,
        solid_selection=preset.solid_selection,
        mode=preset.mode,
        tags_fn_for_date=lambda _: preset.tags,
    )
    def my_daily_schedule(_date):
        return preset.run_config

    return my_daily_schedule

Troubleshooting#

Try these steps if you're trying to run a schedule and are running into problems. These steps assume that you are using the default scheduler on your instance, which executes your schedules in the dagster-daemon service. See the migration guide for instructions on how to migrate to this scheduler.

Step 1: Is your schedule included in your repository and turned on?#

The left-hand navigation bar in Dagit shows all of the schedules for the currently-selected repository, with a green dot next to each schedule that is running. Make sure that your schedule appears in this list with a green dot. To ensure that Dagit has loaded the latest version of your schedule code, you can press the reload button next to your repository name to reload all the code in your repository.

  • If Dagit is unable to load the repository containing your schedule (for example, due to a syntax error or a problem with one of your definitions), there should be an error message in the left nav with a link that will give you more information about the error.
  • If the repository is loading, but the schedule doesn't appear in the list of schedules, make sure that your schedule function is included in the list of schedules returned by your repository function.
  • If the schedule appears but doesn't have a green dot next to it, click on the name of the schedule, then toggle the switch at the top of the screen to mark it as running.

Step 2: Is your schedule interval set up correctly?#

When you click on your schedule name in the left-hand nav in Dagit, you'll be take to a page where you can view more information about the schedule. If the schedule is running, there should be a "Next tick" row near the top of the page that tells you when the schedule is expected to run next. Make sure that time is what you expect (including the timezone).

Step 3: Is the schedule interval configured correctly, but it still isn't creating any runs?#

It's possible that the dagster-daemon process that submits runs for your schedule is not working correctly. If you haven't set up dagster-daemon yet, check the Deploying Dagster section to find the steps to do so.

First, check that the daemon is running. Click on "Status" in the left nav in Dagit, and examine the "Scheduler" row under the "Daemon statuses" section. The daemon process periodically sends out a heartbeat from the scheduler, so if the scheduler daemon is listed as "Not running", that indicates that there's a problem with your daemon deployment. If the daemon ran into an error that caused it to throw an exception, that error will often appear in this UI as well.

If there isn't a clear error on this page, or if the daemon should be sending heartbeats but isn't, you may need to check the logs from the daemon process. The steps to do this will depend on your deployment - for example, if you're using Kubernetes, you'll need to get the logs from the pod that's running the daemon. You should be able to search those logs for the name of your schedule (or SchedulerDaemon to see all logs associated with the scheduler) to gain an understanding of what's going wrong.

Finally, it's possible that the daemon is running correctly, but there's a problem with your schedule code. Check the "Latest tick" row on the page for your schedule. If there was an error while trying to submit runs for your schedule, there should be a red "Failure" box next to the time. Clicking on the box should display an error with a stack trace showing you why the schedule couldn't execute. If the schedule is working as expected, it should display a blue box instead with information about any runs that were created by that schedule tick.

Still stuck?#

If these steps didn't help and your schedule still isn't running, reach out in Slack or file an issue and we'll be happy to help investigate.