Source code for dagster_cron.cron_scheduler

import io
import os
import shutil
import stat
import sys

from crontab import CronTab
from dagster import DagsterInstance, check, utils
from dagster.core.host_representation import ExternalSchedule
from dagster.core.scheduler import DagsterSchedulerError, Scheduler
from dagster.serdes import ConfigurableClass


[docs]class SystemCronScheduler(Scheduler, ConfigurableClass): """Scheduler implementation that uses the local systems cron. Only works on unix systems that have cron. Enable this scheduler by adding it to your ``dagster.yaml`` in ``$DAGSTER_HOME``. """ def __init__( self, inst_data=None, ): self._inst_data = inst_data @property def inst_data(self): return self._inst_data @classmethod def config_type(cls): return {} @staticmethod def from_config_value(inst_data, config_value): return SystemCronScheduler(inst_data=inst_data) def get_cron_tab(self): return CronTab(user=True) def debug_info(self): return "Running Cron Jobs:\n{jobs}\n".format( jobs="\n".join( [str(job) for job in self.get_cron_tab() if "dagster-schedule:" in job.comment] ) ) def start_schedule(self, instance, external_schedule): check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_schedule, "external_schedule", ExternalSchedule) schedule_origin_id = external_schedule.get_external_origin_id() # If the cron job already exists, remove it. This prevents duplicate entries. # Then, add a new cron job to the cron tab. if self.running_schedule_count(instance, external_schedule.get_external_origin_id()) > 0: self._end_cron_job(instance, schedule_origin_id) self._start_cron_job(instance, external_schedule) # Verify that the cron job is running running_schedule_count = self.running_schedule_count(instance, schedule_origin_id) if running_schedule_count == 0: raise DagsterSchedulerError( "Attempted to write cron job for schedule " "{schedule_name}, but failed. " "The scheduler is not running {schedule_name}.".format( schedule_name=external_schedule.name ) ) elif running_schedule_count > 1: raise DagsterSchedulerError( "Attempted to write cron job for schedule " "{schedule_name}, but duplicate cron jobs were found. " "There are {running_schedule_count} jobs running for the schedule." "To resolve, run `dagster schedule up`, or edit the cron tab to " "remove duplicate schedules".format( schedule_name=external_schedule.name, running_schedule_count=running_schedule_count, ) ) def stop_schedule(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") schedule = self._get_schedule_state(instance, schedule_origin_id) self._end_cron_job(instance, schedule_origin_id) # Verify that the cron job has been removed running_schedule_count = self.running_schedule_count(instance, schedule_origin_id) if running_schedule_count > 0: raise DagsterSchedulerError( "Attempted to remove existing cron job for schedule " "{schedule_name}, but failed. " "There are still {running_schedule_count} jobs running for the schedule.".format( schedule_name=schedule.name, running_schedule_count=running_schedule_count ) ) def wipe(self, instance): # Note: This method deletes schedules from ALL repositories check.inst_param(instance, "instance", DagsterInstance) # Delete all script files script_directory = os.path.join(instance.schedules_directory(), "scripts") if os.path.isdir(script_directory): shutil.rmtree(script_directory) # Delete all logs logs_directory = os.path.join(instance.schedules_directory(), "logs") if os.path.isdir(logs_directory): shutil.rmtree(logs_directory) # Remove all cron jobs with self.get_cron_tab() as cron_tab: for job in cron_tab: if "dagster-schedule:" in job.comment: cron_tab.remove_all(comment=job.comment) def _get_bash_script_file_path(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") script_directory = os.path.join(instance.schedules_directory(), "scripts") utils.mkdir_p(script_directory) script_file_name = "{}.sh".format(schedule_origin_id) return os.path.join(script_directory, script_file_name) def _cron_tag_for_schedule(self, schedule_origin_id): return "dagster-schedule: {schedule_origin_id}".format( schedule_origin_id=schedule_origin_id ) def _get_command(self, script_file, instance, schedule_origin_id): schedule_log_file_path = self.get_logs_path(instance, schedule_origin_id) command = "{script_file} > {schedule_log_file_path} 2>&1".format( script_file=script_file, schedule_log_file_path=schedule_log_file_path ) return command def _start_cron_job(self, instance, external_schedule): schedule_origin_id = external_schedule.get_external_origin_id() script_file = self._write_bash_script_to_file(instance, external_schedule) command = self._get_command(script_file, instance, schedule_origin_id) with self.get_cron_tab() as cron_tab: job = cron_tab.new( command=command, comment="dagster-schedule: {schedule_origin_id}".format( schedule_origin_id=schedule_origin_id ), ) job.setall(external_schedule.cron_schedule) def _end_cron_job(self, instance, schedule_origin_id): with self.get_cron_tab() as cron_tab: cron_tab.remove_all(comment=self._cron_tag_for_schedule(schedule_origin_id)) script_file = self._get_bash_script_file_path(instance, schedule_origin_id) if os.path.isfile(script_file): os.remove(script_file) def running_schedule_count(self, instance, schedule_origin_id): matching_jobs = self.get_cron_tab().find_comment( self._cron_tag_for_schedule(schedule_origin_id) ) return len(list(matching_jobs)) def _get_or_create_logs_directory(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") logs_directory = os.path.join(instance.schedules_directory(), "logs", schedule_origin_id) if not os.path.isdir(logs_directory): utils.mkdir_p(logs_directory) return logs_directory def get_logs_path(self, instance, schedule_origin_id): check.inst_param(instance, "instance", DagsterInstance) check.str_param(schedule_origin_id, "schedule_origin_id") logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id) return os.path.join(logs_directory, "scheduler.log") def _write_bash_script_to_file(self, instance, external_schedule): # Get path to store bash script schedule_origin_id = external_schedule.get_external_origin_id() script_file = self._get_bash_script_file_path(instance, schedule_origin_id) # Get path to store schedule attempt logs logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id) schedule_log_file_name = "{}_{}.result".format("${RUN_DATE}", schedule_origin_id) schedule_log_file_path = os.path.join(logs_directory, schedule_log_file_name) local_target = external_schedule.get_external_origin() # Environment information needed for execution dagster_home = os.getenv("DAGSTER_HOME") script_contents = """ #!/bin/bash export DAGSTER_HOME={dagster_home} export LANG=en_US.UTF-8 {env_vars} export RUN_DATE=$(date "+%Y%m%dT%H%M%S") {python_exe} -m dagster api launch_scheduled_execution --schedule_name {schedule_name} {repo_cli_args} "{result_file}" """.format( python_exe=sys.executable, schedule_name=external_schedule.name, repo_cli_args=local_target.get_repo_cli_args(), result_file=schedule_log_file_path, dagster_home=dagster_home, env_vars="\n".join( [ "export {key}={value}".format(key=key, value=value) for key, value in external_schedule.environment_vars.items() ] ), ) with io.open(script_file, "w", encoding="utf-8") as f: f.write(script_contents) st = os.stat(script_file) os.chmod(script_file, st.st_mode | stat.S_IEXEC) return script_file