# Imports
from configparser import ConfigParser
from datetime import datetime
from importlib import import_module
import logging
import os
from superdjango.interfaces.cli.constants import OUTPUT_FORMAT
from superdjango.interfaces.cli.library import Table
from .constants import DEFAULT_INTERVAL, FREQUENCIES, FREQUENCY_CHOICES, STATUS
log = logging.getLogger(__name__)
# Exports
__all__ = (
"factory",
"Job",
"Result",
"Schedule",
)
# Constants
BARE_FREQUENCIES = [
FREQUENCIES.DAY,
FREQUENCIES.MONDAY,
FREQUENCIES.TUESDAY,
FREQUENCIES.WEDNESDAY,
FREQUENCIES.THURSDAY,
FREQUENCIES.FRIDAY,
FREQUENCIES.SATURDAY,
FREQUENCIES.SUNDAY,
]
# Functions
[docs]def factory(path, label=None):
"""Initialize a schedule from the given path.
:param path: The path to the ``scheduler.ini`` file.
:type path: str
:param label: The label for the schedule. Defaults to the path with separators changed to dashes.
:type label: str
"""
_label = label or path.replace(os.sep, "-")
if not os.path.exists(path):
log.error("Configuration file does not exist: %s" % path)
return None
log.debug("Loading configuration file: %s" % path)
ini = ConfigParser()
ini.read(path)
jobs = list()
for section in ini.sections():
error = False
kwargs = dict()
label = section
if ":" in section:
app_name, label = section.split(":")
kwargs['app_name'] = app_name
for key, value in ini.items(section):
if key == "active":
if value.lower() in ("false", "no", "off"):
value = False
else:
value = True
kwargs[key] = value
elif key in ("call", "callback", "do", "run"):
kwargs['callback'] = value
elif key == "every":
tokens = value.split(" ")
try:
frequency = tokens[1]
except IndexError:
frequency = value
if frequency not in FREQUENCY_CHOICES:
log.warning("Unrecognized frequency for %s: %s" % (section, frequency))
error = True
continue
kwargs['frequency'] = frequency
if frequency in BARE_FREQUENCIES:
interval = None
else:
try:
interval = int(tokens[0])
# Default interval never triggered because of frequency checking above.
# except IndexError:
# interval = DEFAULT_INTERVAL
except ValueError:
error = True
log.warning("Interval must be given as an integer for: %s" % section)
continue
kwargs['interval'] = interval
elif key == "pk":
kwargs['pk'] = int(value)
else:
kwargs[key] = value
if not error:
job = Job(label, **kwargs)
jobs.append(job)
schedule = Schedule(label=_label)
schedule.jobs = jobs
return schedule
# Classes
[docs]class Job(object):
"""A scheduled job."""
[docs] def __init__(self, label, active=True, app_name=None, at=None, callback=None, description=None,
frequency=FREQUENCIES.MINUTE, interval=DEFAULT_INTERVAL, pk=None, **kwargs):
"""Initialize a job.
:param label: The name or label for the job.
:type label: str
:param active: Indicates the job is to be executed.
:type active: bool
:param app_name: The app from which the job originates.
:type app_name: str
:param at: The specific time at which job should run.
:type at: str
:param callback: The dotted path to the callback for job execution. May also be provided as a callable.
:param description: Optional, additional description of the job.
:type description: str
:param frequency: The frequency upon which the job runs.
:type frequency: str
:param interval: The interval upon which the job runs.
:type interval: int
:param pk: The primary key of the job record associated with this job. Only used when auto-discovery is enabled.
:type pk: int
``kwargs`` are passed as parameters to the callback.
"""
self.active = active
self.app_name = app_name
self.at = at
self.callback = callback
self.description = description
self.frequency = frequency
self.interval = interval
self.label = label
self.parameters = kwargs
self.pk = pk
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.label)
@property
def every(self):
"""Recombines interval and frequency.
:rtype: str
"""
return "%s %s" % (self.interval, self.frequency)
[docs] def get_callback(self):
"""Alias for ``_import_callback()``."""
if callable(self.callback):
return self.callback
return self._import_callback()
[docs] def run(self):
"""Execute the job.
:rtype: Result
"""
callback = self.get_callback()
if callback is None:
return Result(STATUS.CRITICAL, label=self.label, output="Callback not found: %s" % self.callback)
start_dt = datetime.now()
try:
result = callback(pk=self.pk, **self.parameters)
except Exception as e:
result = Result(
STATUS.FAILURE,
message="The job has failed to properly execute.",
output=str(e)
)
end_dt = datetime.now()
result.label = self.label
result.start_dt = start_dt
result.end_dt = end_dt
# if self.pk is not None:
# try:
# _job = ScheduledJob.objects.get(pk=self.pk)
# except ScheduledJob.DoesNotExist:
# log.warning("Job record (%s) does not exist for job: %s" % (self.pk, self.label))
return result
[docs] def to_ini(self):
"""Export the job to INI format.
:rtype: str
"""
a = list()
if self.app_name:
a.append("[%s:%s]" % (self.app_name, self.label))
else:
a.append("[%s]" % self.label)
if not self.active:
a.append("active = no")
if self.at:
a.append("at = %s" % self.at)
if self.callback:
if callable(self.callback):
if self.app_name:
a.append("call = %s.scheduler.%s" % (self.app_name, self.callback.__name__))
else:
a.append("call = %s" % self.callback)
if self.description:
a.append("description = %s" % self.description)
a.append("every = %s %s" % (self.interval, self.frequency))
if self.pk:
a.append("pk = %s" % self.pk)
if self.parameters:
for key, value in self.parameters.items():
a.append("%s = %s" % (key, value))
a.append("")
return "\n".join(a)
[docs] def to_markdown(self):
"""Export the job to Markdown format.
:rtype: str
"""
a = list()
a.append("### %s" % self.label)
a.append("")
if self.description:
a.append(self.description)
a.append("")
if self.active:
active = "yes"
else:
active = "no"
a.append("- Active: %s" % active)
a.append("- Callback: %s" % self.callback)
if self.interval:
a.append("- Interval: %s" % self.interval)
if self.frequency:
a.append("- Frequency: %s" % self.frequency)
if self.at:
a.append("- At: %s" % self.at)
a.append("")
return "\n".join(a)
[docs] def to_plain(self):
"""Export the job to plain text format.
:rtype: str
"""
a = list()
a.append("Job: %s" % self.label)
if self.description:
a.append("Description: %s" % self.description)
if self.active:
active = "yes"
else:
active = "no"
a.append("Active: %s" % active)
a.append("Callback: %s" % self.callback)
if self.interval:
a.append("Interval: %s" % self.interval)
if self.frequency:
a.append("Frequency: %s" % self.frequency)
if self.at:
a.append("At: %s" % self.at)
a.append("-" * 120)
return "\n".join(a)
[docs] def to_rst(self):
"""Export the job to ReStructuredText format.
:rtype: str
"""
a = list()
a.append(self.label)
a.append("-" * len(self.label))
a.append("")
if self.description:
a.append(self.description)
a.append("")
if self.active:
active = "yes"
else:
active = "no"
a.append("- Active: %s" % active)
a.append("- Callback: %s" % self.callback)
if self.interval:
a.append("- Interval: %s" % self.interval)
if self.frequency:
a.append("- Frequency: %s" % self.frequency)
if self.at:
a.append("- At: %s" % self.at)
a.append("")
return "\n".join(a)
def _import_callback(self):
"""Import the callback for the job.
:returns: A callable or ``None`` if the callback could not be imported.
"""
tokens = self.callback.split(".")
callback = tokens.pop(-1)
target = ".".join(tokens)
try:
module = import_module(target)
try:
return getattr(module, callback)
except AttributeError:
log.error("Callback does not exist in %s module: %s" % (target, callback))
return None
except ImportError as e:
log.error("Failed to import callback %s for %s: %s" % (self.callback, self.label, e))
return None
[docs]class Result(object):
"""The result of a scheduled job."""
[docs] def __init__(self, status, end_dt=None, label=None, message=None, output=None, start_dt=None):
"""Initialize a result.
:param status: The status of the execution.
:type status: str
:param end_dt: The date and time the job completed. Defaults to now.
:type end_dt: datetime
:param label: The job label.
:type label: str
:param message: The human-friendly message.
:type message: str
:param output: The output, if any, produced by the job.
:type output: str
:param start_dt: The date and time the job started. Defaults to now.
:type start_dt: datetime
"""
self.label = label
self.message = message
self.output = output
self.status = status
current_dt = datetime.now()
self.end_dt = end_dt or current_dt
self.start_dt = start_dt or current_dt
@property
def elapsed_time(self):
"""The amount of time that passed to execute the job.
:rtype: timedelta
"""
return self.end_dt - self.start_dt
@property
def failure(self):
"""Indicates the job failed to properly execute.
:rtype: bool
"""
return self.status is not STATUS.SUCCESS
@property
def success(self):
"""Indicates the job was successfully executed.
:rtype: bool
"""
return self.status == STATUS.SUCCESS
[docs]class Schedule(object):
"""A collection of scheduled jobs."""
[docs] def __init__(self, label=None):
self.jobs = list()
self.label = label
# def __init__(self, path, label=None):
# """Initialize a schedule.
#
# :param path: The path to the ``schedule.ini`` file.
# :type path: str
#
# :param label: The label for the schedule. Defaults to the path with separators changed to dashes.
# :type label: str
#
# """
# self.is_loaded = False
# self.jobs = list()
# self.label = label or path.replace(os.sep, "-")
# self.path = path
def __iter__(self):
return iter(self.jobs)
def __len__(self):
return len(self.jobs)
# def load(self):
# """Load the schedule.
#
# :rtype: bool
#
# """
# if not os.path.exists(self.path):
# log.critical("Configuration file does not exist: %s" % self.path)
# return False
#
# log.debug("Loading configuration file: %s" % self.path)
# ini = ConfigParser()
# ini.read(self.path)
#
# for section in ini.sections():
# error = False
# kwargs = dict()
# for key, value in ini.items(section):
# if key in ("call", "callback", "do", "run"):
# kwargs['callback'] = value
# elif key == "active":
# if value.lower() in ("false", "no", "off"):
# value = False
# else:
# value = True
#
# kwargs[key] = value
# elif key == "every":
# tokens = value.split(" ")
# try:
# frequency = tokens[1]
# interval = int(tokens[0])
# except IndexError:
# frequency = value
# interval = DEFAULT_INTERVAL
#
# if frequency not in FREQUENCIES:
# log.warning("Unrecognized frequency for %s: %s" % (section, frequency))
# error = True
# continue
#
# kwargs['frequency'] = frequency
# kwargs['interval'] = interval
# else:
# kwargs[key] = value
#
# if not error:
# job = Job(section, **kwargs)
# self.jobs.append(job)
#
# self.is_loaded = len(self.jobs) > 0
#
# return self.is_loaded
[docs] def to_ini(self):
"""Export all jobs to INI format.
:rtype: str
"""
a = list()
a.append("; %s" % self.label)
a.append("")
for job in self.jobs:
a.append(job.to_ini())
return "\n".join(a)
[docs] def to_markdown(self, tabular=False):
"""Export the schedule as Markdown.
:param tabular: Indicates the expected output is in table form.
:type tabular: bool
:rtype: str
"""
a = list()
a.append("## %s" % self.label)
a.append("")
if tabular:
table = self._get_table(OUTPUT_FORMAT.MARKDOWN)
a.append(str(table))
else:
for job in self.jobs:
a.append(job.to_markdown())
a.append("")
return "\n".join(a)
[docs] def to_plain(self, tabular=False):
"""Export the schedule as plain text.
:param tabular: Indicates the expected output is in table form.
:type tabular: bool
:rtype: str
"""
a = list()
a.append(self.label)
a.append("")
if tabular:
table = self._get_table(OUTPUT_FORMAT.SIMPLE)
a.append(str(table))
else:
for job in self.jobs:
a.append(job.to_plain())
a.append("")
return "\n".join(a)
[docs] def to_rst(self, tabular=False):
"""Export the schedule as ReStructuredText.
:param tabular: Indicates the expected output is in table form.
:type tabular: bool
:rtype: str
"""
a = list()
a.append(self.label)
a.append("=" * len(self.label))
a.append("")
if tabular:
table = self._get_table(OUTPUT_FORMAT.RST)
a.append(str(table))
else:
for job in self.jobs:
a.append(job.to_rst())
a.append("")
return "\n".join(a)
def _get_table(self, formatting):
"""Get the table.
:param formatting: The expected formatting of the table.
:type formatting: str
:rtype: superdjango.interfaces.cli.library.Table
"""
if formatting in ("md", "markdown"):
formatting = "pipe"
headings = [
"Label",
"Interval",
"Frequency",
"Active",
"Callback",
]
table = Table(headings, formatting=formatting)
for job in self.jobs:
table.add([
job.label,
job.interval,
job.frequency,
job.active,
job.callback,
])
return table