# Imports
from django.core.exceptions import ImproperlyConfigured
from django.utils.text import slugify
from myninjas.utils import write_file, read_file
import os
import re
from superdjango.conf import SUPERDJANGO
import tempfile
from .compat import oembed
from .exceptions import EmbedNotFound
from .providers import ALL_PROVIDERS
# Constants
DATA_PATH = SUPERDJANGO.get("data_path", default=tempfile.gettempdir())
# Classes
[docs]class Embed(object):
"""Collect and deliver embedded output."""
[docs] def __init__(self, url, **kwargs):
"""Initialize the embed.
:param url: The original URL.
:type url: str
:param kwargs: The attributes of the embed. These become available as ``embed.<attribute_name>``.
"""
self.url = url
self._attributes = kwargs
def __getattr__(self, item):
"""Get the named attribute for the embed."""
return self._attributes.get(item, None)
[docs] @classmethod
def from_cache(cls, path, url):
"""Get an embed from a cache file.
:param path: The path to the cache file.
:type path: str
:param url: The original URL.
:type url: str
:rtype: Embed
.. warning::
The path is not checked before attempting to read the file.
"""
kwargs = {'html': read_file(path)}
return cls(url, **kwargs)
[docs] def render(self):
"""Get the HTML for the embed.
:rtype: str
"""
return self._attributes['html']
[docs] def write(self, path):
"""Save the embed to a cache file.
:param path: The path to the cache file.
:type path: str
.. warning::
The path is not checked before attempting to write the file.
"""
write_file(path, content=self.html, make_directories=True)
[docs]class BaseEmbedFinder(object):
"""Base class for embed finders."""
[docs] def __init__(self, providers=None, response_format="json"):
"""Initialize the embed.
:param providers: A list of providers to be used for identifying the embedded URL. Defaults to
``ALL_PROVIDERS``.
:param providers: list
:param response_format: The response format from the provider.
:type response_format: str
"""
self.format = response_format
self.is_loaded = False
self.providers = providers or ALL_PROVIDERS
self._endpoints = dict()
[docs] def get(self, url, cache_enabled=True, **options):
"""Get the embed object.
:param url: The URL to be embedded.
:type url: str
:param cache_enabled: Draw from cache, if available.
:type cache_enabled: bool
:param options: The options to be passed to the provider. For example, ``height`` or ``width``.
:type options: dict
:rtype: Embed
:raises: EmbedNotFound
"""
raise NotImplementedError()
[docs] def load(self):
"""Load providers, parsing endpoints and URLs.
:rtype: bool
"""
# if oembed is None:
# raise ImproperlyConfigured("oembed is not installed.")
for provider in self.providers:
patterns = list()
try:
endpoint = provider['endpoint'].replace('{format}', "json")
except KeyError:
return False
for url in provider['urls']:
try:
patterns.append(re.compile(url))
except re.error:
# TODO: Log regex errors on embed finder compile.
pass
self._endpoints[endpoint] = patterns
self.is_loaded = True
return True
def _get_endpoint(self, url):
"""Get the endpoint match for the given URL.
:rtype: oembed.OEmbedEndpoint | None
"""
for endpoint, patterns in list(self._endpoints.items()):
for pattern in patterns:
if re.match(pattern, url):
return oembed.OEmbedEndpoint(endpoint, urlSchemes=[url])
return None
[docs]class OEmbedFinder(BaseEmbedFinder):
"""Find and load an embed based on the URL of the embedded content."""
[docs] def get(self, url, cache_enabled=True, **options):
"""Get an embed using ``oembed``.
:param url: The URL to be embedded.
:type url: str
:param cache_enabled: Draw from cache, if available.
:type cache_enabled: bool
:param options: Options to be passed to the provider.
:type options: dict
"""
if cache_enabled:
embed = self.get_from_cache(url)
if embed is not None:
return embed
endpoint = self._get_endpoint(url)
if endpoint is None:
raise EmbedNotFound("Embed code could not be found: %s" % url)
consumer = oembed.OEmbedConsumer()
consumer.addEndpoint(endpoint)
# _get_endpoint should ensure that an oembed.OEmbedNoEndpoint does not occur.
response = consumer.embed(url, **options)
kwargs = response.getData()
embed = Embed(url, **kwargs)
if cache_enabled:
path = self._get_cache_path(url)
embed.write(path)
return embed
[docs] def get_from_cache(self, url):
"""Get the embed from cache rather than calling the provider.
:param url: The URL to be embedded.
:type url: str
:rtype: Embed | None
"""
path = self._get_cache_path(url)
if os.path.exists(path):
return Embed.from_cache(path, url)
return None
# noinspection PyMethodMayBeStatic
def _get_cache_path(self, url):
"""Get the path to the embedded cache file for the given URL.
:param url: The URL to be embedded.
:type url: str
:rtype: str
"""
file_name = slugify(url) + ".txt"
return os.path.join(DATA_PATH, "embeds", file_name)
# Preload oembed so that it does not have to be done each time.
oembed_finder = OEmbedFinder()
oembed_finder.load()