Source code for superdjango.contrib.scheduler.library

# Imports

from configparser import ConfigParser
from datetime import datetime
from importlib import import_module
import logging
import os
from superdjango.interfaces.cli.constants import OUTPUT_FORMAT
from superdjango.interfaces.cli.library import Table
from .constants import DEFAULT_INTERVAL, FREQUENCIES, FREQUENCY_CHOICES, STATUS

log = logging.getLogger(__name__)

# Exports

__all__ = (
    "factory",
    "Job",
    "Result",
    "Schedule",
)

# Constants

BARE_FREQUENCIES = [
    FREQUENCIES.DAY,
    FREQUENCIES.MONDAY,
    FREQUENCIES.TUESDAY,
    FREQUENCIES.WEDNESDAY,
    FREQUENCIES.THURSDAY,
    FREQUENCIES.FRIDAY,
    FREQUENCIES.SATURDAY,
    FREQUENCIES.SUNDAY,
]

# Functions


[docs]def factory(path, label=None): """Initialize a schedule from the given path. :param path: The path to the ``scheduler.ini`` file. :type path: str :param label: The label for the schedule. Defaults to the path with separators changed to dashes. :type label: str """ _label = label or path.replace(os.sep, "-") if not os.path.exists(path): log.error("Configuration file does not exist: %s" % path) return None log.debug("Loading configuration file: %s" % path) ini = ConfigParser() ini.read(path) jobs = list() for section in ini.sections(): error = False kwargs = dict() label = section if ":" in section: app_name, label = section.split(":") kwargs['app_name'] = app_name for key, value in ini.items(section): if key == "active": if value.lower() in ("false", "no", "off"): value = False else: value = True kwargs[key] = value elif key in ("call", "callback", "do", "run"): kwargs['callback'] = value elif key == "every": tokens = value.split(" ") try: frequency = tokens[1] except IndexError: frequency = value if frequency not in FREQUENCY_CHOICES: log.warning("Unrecognized frequency for %s: %s" % (section, frequency)) error = True continue kwargs['frequency'] = frequency if frequency in BARE_FREQUENCIES: interval = None else: try: interval = int(tokens[0]) # Default interval never triggered because of frequency checking above. # except IndexError: # interval = DEFAULT_INTERVAL except ValueError: error = True log.warning("Interval must be given as an integer for: %s" % section) continue kwargs['interval'] = interval elif key == "pk": kwargs['pk'] = int(value) else: kwargs[key] = value if not error: job = Job(label, **kwargs) jobs.append(job) schedule = Schedule(label=_label) schedule.jobs = jobs return schedule
# Classes
[docs]class Job(object): """A scheduled job."""
[docs] def __init__(self, label, active=True, app_name=None, at=None, callback=None, description=None, frequency=FREQUENCIES.MINUTE, interval=DEFAULT_INTERVAL, pk=None, **kwargs): """Initialize a job. :param label: The name or label for the job. :type label: str :param active: Indicates the job is to be executed. :type active: bool :param app_name: The app from which the job originates. :type app_name: str :param at: The specific time at which job should run. :type at: str :param callback: The dotted path to the callback for job execution. May also be provided as a callable. :param description: Optional, additional description of the job. :type description: str :param frequency: The frequency upon which the job runs. :type frequency: str :param interval: The interval upon which the job runs. :type interval: int :param pk: The primary key of the job record associated with this job. Only used when auto-discovery is enabled. :type pk: int ``kwargs`` are passed as parameters to the callback. """ self.active = active self.app_name = app_name self.at = at self.callback = callback self.description = description self.frequency = frequency self.interval = interval self.label = label self.parameters = kwargs self.pk = pk
def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.label) @property def every(self): """Recombines interval and frequency. :rtype: str """ return "%s %s" % (self.interval, self.frequency)
[docs] def get_callback(self): """Alias for ``_import_callback()``.""" if callable(self.callback): return self.callback return self._import_callback()
[docs] def run(self): """Execute the job. :rtype: Result """ callback = self.get_callback() if callback is None: return Result(STATUS.CRITICAL, label=self.label, output="Callback not found: %s" % self.callback) start_dt = datetime.now() try: result = callback(pk=self.pk, **self.parameters) except Exception as e: result = Result( STATUS.FAILURE, message="The job has failed to properly execute.", output=str(e) ) end_dt = datetime.now() result.label = self.label result.start_dt = start_dt result.end_dt = end_dt # if self.pk is not None: # try: # _job = ScheduledJob.objects.get(pk=self.pk) # except ScheduledJob.DoesNotExist: # log.warning("Job record (%s) does not exist for job: %s" % (self.pk, self.label)) return result
[docs] def to_ini(self): """Export the job to INI format. :rtype: str """ a = list() if self.app_name: a.append("[%s:%s]" % (self.app_name, self.label)) else: a.append("[%s]" % self.label) if not self.active: a.append("active = no") if self.at: a.append("at = %s" % self.at) if self.callback: if callable(self.callback): if self.app_name: a.append("call = %s.scheduler.%s" % (self.app_name, self.callback.__name__)) else: a.append("call = %s" % self.callback) if self.description: a.append("description = %s" % self.description) a.append("every = %s %s" % (self.interval, self.frequency)) if self.pk: a.append("pk = %s" % self.pk) if self.parameters: for key, value in self.parameters.items(): a.append("%s = %s" % (key, value)) a.append("") return "\n".join(a)
[docs] def to_markdown(self): """Export the job to Markdown format. :rtype: str """ a = list() a.append("### %s" % self.label) a.append("") if self.description: a.append(self.description) a.append("") if self.active: active = "yes" else: active = "no" a.append("- Active: %s" % active) a.append("- Callback: %s" % self.callback) if self.interval: a.append("- Interval: %s" % self.interval) if self.frequency: a.append("- Frequency: %s" % self.frequency) if self.at: a.append("- At: %s" % self.at) a.append("") return "\n".join(a)
[docs] def to_plain(self): """Export the job to plain text format. :rtype: str """ a = list() a.append("Job: %s" % self.label) if self.description: a.append("Description: %s" % self.description) if self.active: active = "yes" else: active = "no" a.append("Active: %s" % active) a.append("Callback: %s" % self.callback) if self.interval: a.append("Interval: %s" % self.interval) if self.frequency: a.append("Frequency: %s" % self.frequency) if self.at: a.append("At: %s" % self.at) a.append("-" * 120) return "\n".join(a)
[docs] def to_rst(self): """Export the job to ReStructuredText format. :rtype: str """ a = list() a.append(self.label) a.append("-" * len(self.label)) a.append("") if self.description: a.append(self.description) a.append("") if self.active: active = "yes" else: active = "no" a.append("- Active: %s" % active) a.append("- Callback: %s" % self.callback) if self.interval: a.append("- Interval: %s" % self.interval) if self.frequency: a.append("- Frequency: %s" % self.frequency) if self.at: a.append("- At: %s" % self.at) a.append("") return "\n".join(a)
def _import_callback(self): """Import the callback for the job. :returns: A callable or ``None`` if the callback could not be imported. """ tokens = self.callback.split(".") callback = tokens.pop(-1) target = ".".join(tokens) try: module = import_module(target) try: return getattr(module, callback) except AttributeError: log.error("Callback does not exist in %s module: %s" % (target, callback)) return None except ImportError as e: log.error("Failed to import callback %s for %s: %s" % (self.callback, self.label, e)) return None
[docs]class Result(object): """The result of a scheduled job."""
[docs] def __init__(self, status, end_dt=None, label=None, message=None, output=None, start_dt=None): """Initialize a result. :param status: The status of the execution. :type status: str :param end_dt: The date and time the job completed. Defaults to now. :type end_dt: datetime :param label: The job label. :type label: str :param message: The human-friendly message. :type message: str :param output: The output, if any, produced by the job. :type output: str :param start_dt: The date and time the job started. Defaults to now. :type start_dt: datetime """ self.label = label self.message = message self.output = output self.status = status current_dt = datetime.now() self.end_dt = end_dt or current_dt self.start_dt = start_dt or current_dt
@property def elapsed_time(self): """The amount of time that passed to execute the job. :rtype: timedelta """ return self.end_dt - self.start_dt @property def failure(self): """Indicates the job failed to properly execute. :rtype: bool """ return self.status is not STATUS.SUCCESS @property def success(self): """Indicates the job was successfully executed. :rtype: bool """ return self.status == STATUS.SUCCESS
[docs]class Schedule(object): """A collection of scheduled jobs."""
[docs] def __init__(self, label=None): self.jobs = list() self.label = label
# def __init__(self, path, label=None): # """Initialize a schedule. # # :param path: The path to the ``schedule.ini`` file. # :type path: str # # :param label: The label for the schedule. Defaults to the path with separators changed to dashes. # :type label: str # # """ # self.is_loaded = False # self.jobs = list() # self.label = label or path.replace(os.sep, "-") # self.path = path def __iter__(self): return iter(self.jobs) def __len__(self): return len(self.jobs) # def load(self): # """Load the schedule. # # :rtype: bool # # """ # if not os.path.exists(self.path): # log.critical("Configuration file does not exist: %s" % self.path) # return False # # log.debug("Loading configuration file: %s" % self.path) # ini = ConfigParser() # ini.read(self.path) # # for section in ini.sections(): # error = False # kwargs = dict() # for key, value in ini.items(section): # if key in ("call", "callback", "do", "run"): # kwargs['callback'] = value # elif key == "active": # if value.lower() in ("false", "no", "off"): # value = False # else: # value = True # # kwargs[key] = value # elif key == "every": # tokens = value.split(" ") # try: # frequency = tokens[1] # interval = int(tokens[0]) # except IndexError: # frequency = value # interval = DEFAULT_INTERVAL # # if frequency not in FREQUENCIES: # log.warning("Unrecognized frequency for %s: %s" % (section, frequency)) # error = True # continue # # kwargs['frequency'] = frequency # kwargs['interval'] = interval # else: # kwargs[key] = value # # if not error: # job = Job(section, **kwargs) # self.jobs.append(job) # # self.is_loaded = len(self.jobs) > 0 # # return self.is_loaded
[docs] def to_ini(self): """Export all jobs to INI format. :rtype: str """ a = list() a.append("; %s" % self.label) a.append("") for job in self.jobs: a.append(job.to_ini()) return "\n".join(a)
[docs] def to_markdown(self, tabular=False): """Export the schedule as Markdown. :param tabular: Indicates the expected output is in table form. :type tabular: bool :rtype: str """ a = list() a.append("## %s" % self.label) a.append("") if tabular: table = self._get_table(OUTPUT_FORMAT.MARKDOWN) a.append(str(table)) else: for job in self.jobs: a.append(job.to_markdown()) a.append("") return "\n".join(a)
[docs] def to_plain(self, tabular=False): """Export the schedule as plain text. :param tabular: Indicates the expected output is in table form. :type tabular: bool :rtype: str """ a = list() a.append(self.label) a.append("") if tabular: table = self._get_table(OUTPUT_FORMAT.SIMPLE) a.append(str(table)) else: for job in self.jobs: a.append(job.to_plain()) a.append("") return "\n".join(a)
[docs] def to_rst(self, tabular=False): """Export the schedule as ReStructuredText. :param tabular: Indicates the expected output is in table form. :type tabular: bool :rtype: str """ a = list() a.append(self.label) a.append("=" * len(self.label)) a.append("") if tabular: table = self._get_table(OUTPUT_FORMAT.RST) a.append(str(table)) else: for job in self.jobs: a.append(job.to_rst()) a.append("") return "\n".join(a)
def _get_table(self, formatting): """Get the table. :param formatting: The expected formatting of the table. :type formatting: str :rtype: superdjango.interfaces.cli.library.Table """ if formatting in ("md", "markdown"): formatting = "pipe" headings = [ "Label", "Interval", "Frequency", "Active", "Callback", ] table = Table(headings, formatting=formatting) for job in self.jobs: table.add([ job.label, job.interval, job.frequency, job.active, job.callback, ]) return table