Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Imports
3from django.core.exceptions import ImproperlyConfigured
4from django.utils.text import slugify
5from myninjas.utils import write_file, read_file
6import os
7import re
8from superdjango.conf import SUPERDJANGO
9import tempfile
10from .compat import oembed
11from .exceptions import EmbedNotFound
12from .providers import ALL_PROVIDERS
14# Constants
16DATA_PATH = SUPERDJANGO.get("data_path", default=tempfile.gettempdir())
18# Classes
21class Embed(object):
22 """Collect and deliver embedded output."""
24 def __init__(self, url, **kwargs):
25 """Initialize the embed.
27 :param url: The original URL.
28 :type url: str
30 :param kwargs: The attributes of the embed. These become available as ``embed.<attribute_name>``.
32 """
33 self.url = url
34 self._attributes = kwargs
36 def __getattr__(self, item):
37 """Get the named attribute for the embed."""
38 return self._attributes.get(item, None)
40 @classmethod
41 def from_cache(cls, path, url):
42 """Get an embed from a cache file.
44 :param path: The path to the cache file.
45 :type path: str
47 :param url: The original URL.
48 :type url: str
50 :rtype: Embed
52 .. warning::
53 The path is not checked before attempting to read the file.
55 """
56 kwargs = {'html': read_file(path)}
57 return cls(url, **kwargs)
59 def render(self):
60 """Get the HTML for the embed.
62 :rtype: str
64 """
65 return self._attributes['html']
67 def write(self, path):
68 """Save the embed to a cache file.
70 :param path: The path to the cache file.
71 :type path: str
73 .. warning::
74 The path is not checked before attempting to write the file.
76 """
77 write_file(path, content=self.html, make_directories=True)
80class BaseEmbedFinder(object):
81 """Base class for embed finders."""
83 def __init__(self, providers=None, response_format="json"):
84 """Initialize the embed.
86 :param providers: A list of providers to be used for identifying the embedded URL. Defaults to
87 ``ALL_PROVIDERS``.
88 :param providers: list
90 :param response_format: The response format from the provider.
91 :type response_format: str
93 """
94 self.format = response_format
95 self.is_loaded = False
96 self.providers = providers or ALL_PROVIDERS
97 self._endpoints = dict()
99 def get(self, url, cache_enabled=True, **options):
100 """Get the embed object.
102 :param url: The URL to be embedded.
103 :type url: str
105 :param cache_enabled: Draw from cache, if available.
106 :type cache_enabled: bool
108 :param options: The options to be passed to the provider. For example, ``height`` or ``width``.
109 :type options: dict
111 :rtype: Embed
112 :raises: EmbedNotFound
114 """
115 raise NotImplementedError()
117 def load(self):
118 """Load providers, parsing endpoints and URLs.
120 :rtype: bool
122 """
123 # if oembed is None:
124 # raise ImproperlyConfigured("oembed is not installed.")
126 for provider in self.providers:
127 patterns = list()
129 try:
130 endpoint = provider['endpoint'].replace('{format}', "json")
131 except KeyError:
132 return False
134 for url in provider['urls']:
135 try:
136 patterns.append(re.compile(url))
137 except re.error:
138 # TODO: Log regex errors on embed finder compile.
139 pass
141 self._endpoints[endpoint] = patterns
143 self.is_loaded = True
145 return True
147 def _get_endpoint(self, url):
148 """Get the endpoint match for the given URL.
150 :rtype: oembed.OEmbedEndpoint | None
152 """
153 for endpoint, patterns in list(self._endpoints.items()):
154 for pattern in patterns:
155 if re.match(pattern, url):
156 return oembed.OEmbedEndpoint(endpoint, urlSchemes=[url])
158 return None
161class OEmbedFinder(BaseEmbedFinder):
162 """Find and load an embed based on the URL of the embedded content."""
164 def get(self, url, cache_enabled=True, **options):
165 """Get an embed using ``oembed``.
167 :param url: The URL to be embedded.
168 :type url: str
170 :param cache_enabled: Draw from cache, if available.
171 :type cache_enabled: bool
173 :param options: Options to be passed to the provider.
174 :type options: dict
176 """
177 if cache_enabled:
178 embed = self.get_from_cache(url)
179 if embed is not None:
180 return embed
182 endpoint = self._get_endpoint(url)
183 if endpoint is None:
184 raise EmbedNotFound("Embed code could not be found: %s" % url)
186 consumer = oembed.OEmbedConsumer()
187 consumer.addEndpoint(endpoint)
189 # _get_endpoint should ensure that an oembed.OEmbedNoEndpoint does not occur.
190 response = consumer.embed(url, **options)
192 kwargs = response.getData()
194 embed = Embed(url, **kwargs)
196 if cache_enabled:
197 path = self._get_cache_path(url)
198 embed.write(path)
200 return embed
202 def get_from_cache(self, url):
203 """Get the embed from cache rather than calling the provider.
205 :param url: The URL to be embedded.
206 :type url: str
208 :rtype: Embed | None
210 """
211 path = self._get_cache_path(url)
213 if os.path.exists(path):
214 return Embed.from_cache(path, url)
216 return None
218 # noinspection PyMethodMayBeStatic
219 def _get_cache_path(self, url):
220 """Get the path to the embedded cache file for the given URL.
222 :param url: The URL to be embedded.
223 :type url: str
225 :rtype: str
227 """
228 file_name = slugify(url) + ".txt"
229 return os.path.join(DATA_PATH, "embeds", file_name)
232# Preload oembed so that it does not have to be done each time.
233oembed_finder = OEmbedFinder()
234oembed_finder.load()