import io
import os
import shutil
import stat
import sys
from crontab import CronTab
from dagster import DagsterInstance, check, utils
from dagster.core.host_representation import ExternalSchedule
from dagster.core.scheduler import DagsterSchedulerError, Scheduler
from dagster.serdes import ConfigurableClass
[docs]class SystemCronScheduler(Scheduler, ConfigurableClass):
"""Scheduler implementation that uses the local systems cron. Only works on unix systems that
have cron.
Enable this scheduler by adding it to your ``dagster.yaml`` in ``$DAGSTER_HOME``.
"""
def __init__(
self,
inst_data=None,
):
self._inst_data = inst_data
@property
def inst_data(self):
return self._inst_data
@classmethod
def config_type(cls):
return {}
@staticmethod
def from_config_value(inst_data, config_value):
return SystemCronScheduler(inst_data=inst_data)
def get_cron_tab(self):
return CronTab(user=True)
def debug_info(self):
return "Running Cron Jobs:\n{jobs}\n".format(
jobs="\n".join(
[str(job) for job in self.get_cron_tab() if "dagster-schedule:" in job.comment]
)
)
def start_schedule(self, instance, external_schedule):
check.inst_param(instance, "instance", DagsterInstance)
check.inst_param(external_schedule, "external_schedule", ExternalSchedule)
schedule_origin_id = external_schedule.get_external_origin_id()
# If the cron job already exists, remove it. This prevents duplicate entries.
# Then, add a new cron job to the cron tab.
if self.running_schedule_count(instance, external_schedule.get_external_origin_id()) > 0:
self._end_cron_job(instance, schedule_origin_id)
self._start_cron_job(instance, external_schedule)
# Verify that the cron job is running
running_schedule_count = self.running_schedule_count(instance, schedule_origin_id)
if running_schedule_count == 0:
raise DagsterSchedulerError(
"Attempted to write cron job for schedule "
"{schedule_name}, but failed. "
"The scheduler is not running {schedule_name}.".format(
schedule_name=external_schedule.name
)
)
elif running_schedule_count > 1:
raise DagsterSchedulerError(
"Attempted to write cron job for schedule "
"{schedule_name}, but duplicate cron jobs were found. "
"There are {running_schedule_count} jobs running for the schedule."
"To resolve, run `dagster schedule up`, or edit the cron tab to "
"remove duplicate schedules".format(
schedule_name=external_schedule.name,
running_schedule_count=running_schedule_count,
)
)
def stop_schedule(self, instance, schedule_origin_id):
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(schedule_origin_id, "schedule_origin_id")
schedule = self._get_schedule_state(instance, schedule_origin_id)
self._end_cron_job(instance, schedule_origin_id)
# Verify that the cron job has been removed
running_schedule_count = self.running_schedule_count(instance, schedule_origin_id)
if running_schedule_count > 0:
raise DagsterSchedulerError(
"Attempted to remove existing cron job for schedule "
"{schedule_name}, but failed. "
"There are still {running_schedule_count} jobs running for the schedule.".format(
schedule_name=schedule.name, running_schedule_count=running_schedule_count
)
)
def wipe(self, instance):
# Note: This method deletes schedules from ALL repositories
check.inst_param(instance, "instance", DagsterInstance)
# Delete all script files
script_directory = os.path.join(instance.schedules_directory(), "scripts")
if os.path.isdir(script_directory):
shutil.rmtree(script_directory)
# Delete all logs
logs_directory = os.path.join(instance.schedules_directory(), "logs")
if os.path.isdir(logs_directory):
shutil.rmtree(logs_directory)
# Remove all cron jobs
with self.get_cron_tab() as cron_tab:
for job in cron_tab:
if "dagster-schedule:" in job.comment:
cron_tab.remove_all(comment=job.comment)
def _get_bash_script_file_path(self, instance, schedule_origin_id):
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(schedule_origin_id, "schedule_origin_id")
script_directory = os.path.join(instance.schedules_directory(), "scripts")
utils.mkdir_p(script_directory)
script_file_name = "{}.sh".format(schedule_origin_id)
return os.path.join(script_directory, script_file_name)
def _cron_tag_for_schedule(self, schedule_origin_id):
return "dagster-schedule: {schedule_origin_id}".format(
schedule_origin_id=schedule_origin_id
)
def _get_command(self, script_file, instance, schedule_origin_id):
schedule_log_file_path = self.get_logs_path(instance, schedule_origin_id)
command = "{script_file} > {schedule_log_file_path} 2>&1".format(
script_file=script_file, schedule_log_file_path=schedule_log_file_path
)
return command
def _start_cron_job(self, instance, external_schedule):
schedule_origin_id = external_schedule.get_external_origin_id()
script_file = self._write_bash_script_to_file(instance, external_schedule)
command = self._get_command(script_file, instance, schedule_origin_id)
with self.get_cron_tab() as cron_tab:
job = cron_tab.new(
command=command,
comment="dagster-schedule: {schedule_origin_id}".format(
schedule_origin_id=schedule_origin_id
),
)
job.setall(external_schedule.cron_schedule)
def _end_cron_job(self, instance, schedule_origin_id):
with self.get_cron_tab() as cron_tab:
cron_tab.remove_all(comment=self._cron_tag_for_schedule(schedule_origin_id))
script_file = self._get_bash_script_file_path(instance, schedule_origin_id)
if os.path.isfile(script_file):
os.remove(script_file)
def running_schedule_count(self, instance, schedule_origin_id):
matching_jobs = self.get_cron_tab().find_comment(
self._cron_tag_for_schedule(schedule_origin_id)
)
return len(list(matching_jobs))
def _get_or_create_logs_directory(self, instance, schedule_origin_id):
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(schedule_origin_id, "schedule_origin_id")
logs_directory = os.path.join(instance.schedules_directory(), "logs", schedule_origin_id)
if not os.path.isdir(logs_directory):
utils.mkdir_p(logs_directory)
return logs_directory
def get_logs_path(self, instance, schedule_origin_id):
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(schedule_origin_id, "schedule_origin_id")
logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id)
return os.path.join(logs_directory, "scheduler.log")
def _write_bash_script_to_file(self, instance, external_schedule):
# Get path to store bash script
schedule_origin_id = external_schedule.get_external_origin_id()
script_file = self._get_bash_script_file_path(instance, schedule_origin_id)
# Get path to store schedule attempt logs
logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id)
schedule_log_file_name = "{}_{}.result".format("${RUN_DATE}", schedule_origin_id)
schedule_log_file_path = os.path.join(logs_directory, schedule_log_file_name)
local_target = external_schedule.get_external_origin()
# Environment information needed for execution
dagster_home = os.getenv("DAGSTER_HOME")
script_contents = """
#!/bin/bash
export DAGSTER_HOME={dagster_home}
export LANG=en_US.UTF-8
{env_vars}
export RUN_DATE=$(date "+%Y%m%dT%H%M%S")
{python_exe} -m dagster api launch_scheduled_execution --schedule_name {schedule_name} {repo_cli_args} "{result_file}"
""".format(
python_exe=sys.executable,
schedule_name=external_schedule.name,
repo_cli_args=local_target.get_repo_cli_args(),
result_file=schedule_log_file_path,
dagster_home=dagster_home,
env_vars="\n".join(
[
"export {key}={value}".format(key=key, value=value)
for key, value in external_schedule.environment_vars.items()
]
),
)
with io.open(script_file, "w", encoding="utf-8") as f:
f.write(script_contents)
st = os.stat(script_file)
os.chmod(script_file, st.st_mode | stat.S_IEXEC)
return script_file