DagsterDocs
Quick search

Source code for dagster_k8s.scheduler

import time

import kubernetes
from dagster import DagsterInstance, Field, Noneable, StringSource, check
from dagster.core.host_representation import ExternalSchedule
from dagster.core.scheduler import DagsterSchedulerError, Scheduler
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from dagster.utils import merge_dicts
from dagster_k8s.job import DagsterK8sJobConfig, construct_dagster_k8s_job, get_k8s_job_name


[docs]class K8sScheduler(Scheduler, ConfigurableClass): """Scheduler implementation on top of Kubernetes CronJob. Enable this scheduler by adding it to your dagster.yaml, or by configuring the scheduler section of the Helm chart https://github.com/dagster-io/dagster/tree/master/helm/dagster""" def __init__( self, dagster_home, service_account_name, instance_config_map, postgres_password_secret, job_image, load_incluster_config=True, scheduler_namespace="default", image_pull_policy="Always", image_pull_secrets=None, kubeconfig_file=None, inst_data=None, env_config_maps=None, env_secrets=None, ): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) if load_incluster_config: check.invariant( kubeconfig_file is None, "`kubeconfig_file` is set but `load_incluster_config` is True.", ) kubernetes.config.load_incluster_config() else: check.opt_str_param(kubeconfig_file, "kubeconfig_file") kubernetes.config.load_kube_config(kubeconfig_file) self._api = kubernetes.client.BatchV1beta1Api() self._namespace = check.str_param(scheduler_namespace, "scheduler_namespace") self.grace_period_seconds = 5 # This should be passed in via config self.job_config = DagsterK8sJobConfig( job_image=check.str_param(job_image, "job_image"), dagster_home=check.str_param(dagster_home, "dagster_home"), image_pull_policy=check.str_param(image_pull_policy, "image_pull_policy"), image_pull_secrets=check.opt_list_param( image_pull_secrets, "image_pull_secrets", of_type=dict ), service_account_name=check.str_param(service_account_name, "service_account_name"), instance_config_map=check.str_param(instance_config_map, "instance_config_map"), postgres_password_secret=check.str_param( postgres_password_secret, "postgres_password_secret" ), env_config_maps=check.opt_list_param(env_config_maps, "env_config_maps", of_type=str), env_secrets=check.opt_list_param(env_secrets, "env_secrets", of_type=str), ) @property def inst_data(self): return self._inst_data @classmethod def config_type(cls): job_cfg = DagsterK8sJobConfig.config_type() scheduler_extra_cfg = { "scheduler_namespace": Field(StringSource, is_required=True), "load_incluster_config": Field(bool, is_required=False, default_value=True), "kubeconfig_file": Field(Noneable(StringSource), is_required=False, default_value=None), } return merge_dicts(job_cfg, scheduler_extra_cfg) @classmethod def from_config_value(cls, inst_data, config_value): return cls(inst_data=inst_data, **config_value) def debug_info(self): return "Running K8s CronJob(s):\n{jobs}\n".format( jobs="\n".join([str(job) for job in self.get_all_cron_jobs()]) ) def wipe(self, instance): # Note: This method deletes schedules from ALL repositories check.inst_param(instance, "instance", DagsterInstance) self._api.delete_collection_namespaced_cron_job(namespace=self._namespace) time.sleep(self.grace_period_seconds) # Verify that no cron jobs are running running_cron_job_count = len(self.get_all_cron_jobs()) if running_cron_job_count != 0: raise DagsterSchedulerError( "Attempted to delete all K8s CronJobs but failed. There are " "still {} running schedules".format(running_cron_job_count) ) def _job_template(self, external_schedule): check.inst_param(external_schedule, "external_schedule", ExternalSchedule) local_target = external_schedule.get_external_origin() job_config = self.job_config external_schedule_name = external_schedule.name job_name = get_k8s_job_name(external_schedule_name) pod_name = job_name job_template = construct_dagster_k8s_job( job_config=job_config, args=[ "dagster", "api", "launch_scheduled_execution", "/tmp/launch_scheduled_execution_output", # https://bugs.python.org/issue20074 prevents using /dev/stdout "--schedule_name", external_schedule_name, ] + local_target.get_repo_cli_args().split(" "), job_name=job_name, pod_name=pod_name, component="scheduled_job", ) return job_template def _start_cron_job(self, external_schedule): check.inst_param(external_schedule, "external_schedule", ExternalSchedule) job_template = self._job_template(external_schedule) cron_job_spec = kubernetes.client.V1beta1CronJobSpec( schedule=external_schedule.cron_schedule, job_template=job_template ) schedule_origin_id = external_schedule.get_external_origin_id() cron_job = kubernetes.client.V1beta1CronJob( spec=cron_job_spec, metadata={"name": schedule_origin_id} ) existing_cron_job = self.get_cron_job(schedule_origin_id=schedule_origin_id) if existing_cron_job: # patch_namespaced_cron_job will cause the containers array to be additive # https://blog.atomist.com/kubernetes-apply-replace-patch/ self._api.replace_namespaced_cron_job( name=schedule_origin_id, body=cron_job, namespace=self._namespace ) else: self._api.create_namespaced_cron_job(body=cron_job, namespace=self._namespace) time.sleep(self.grace_period_seconds) # Update the existing K8s CronJob if it exists; else, create it. def start_schedule(self, instance, external_schedule): check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_schedule, "external_schedule", ExternalSchedule) self._start_cron_job(external_schedule) # Verify that the cron job is running cron_job = self.get_cron_job(schedule_origin_id=external_schedule.get_external_origin_id()) if not cron_job: raise DagsterSchedulerError( "Attempted to add K8s CronJob for schedule {schedule_name}, but failed. " "The schedule {schedule_name} is not running.".format( schedule_name=external_schedule.name ) ) return def refresh_schedule(self, instance, external_schedule): check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_schedule, "external_schedule", ExternalSchedule) self.start_schedule(instance, external_schedule) def running_schedule_count(self, instance, schedule_origin_id): check.str_param(schedule_origin_id, "schedule_origin_id") if self.get_cron_job(schedule_origin_id): return 1 else: return 0 def get_cron_job(self, schedule_origin_id): check.str_param(schedule_origin_id, "schedule_origin_id") cron_jobs = self._api.list_namespaced_cron_job(namespace=self._namespace) for item in cron_jobs.items: if schedule_origin_id == item.metadata.name: return item return None def get_all_cron_jobs(self): return self._api.list_namespaced_cron_job(namespace=self._namespace).items def _end_cron_job(self, schedule_origin_id): check.str_param(schedule_origin_id, "schedule_origin_id") self._api.delete_namespaced_cron_job(name=schedule_origin_id, namespace=self._namespace) time.sleep(self.grace_period_seconds) def stop_schedule(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") if self.get_cron_job(schedule_origin_id): self._end_cron_job(schedule_origin_id=schedule_origin_id) cron_job = self.get_cron_job(schedule_origin_id) if cron_job: schedule = self._get_schedule_state(instance, schedule_origin_id) raise DagsterSchedulerError( "Attempted to remove existing K8s CronJob for schedule " "{schedule_name}, but failed. Schedule is still running.".format( schedule_name=schedule.name ) ) def get_logs_path(self, instance, schedule_origin_id): raise NotImplementedError("To get logs, inspect the corresponding K8s CronJob")