import re
import json
import socket
import logging
import mimetypes
import datetime as dt
import urllib.parse
from http import HTTPStatus
from textwrap import dedent
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from collections import namedtuple, deque
from threading import Thread, Lock
from contextlib import suppress
from inspect import signature
# NOTE: The fallback comes first here as Python 3.7 incorporates
# importlib.resources but at a version incompatible with our requirements.
# Ultimately the try clause should be removed in favour of the except clause
# once compatibility moves beyond Python 3.9
try:
import importlib_resources as resources
except ImportError:
from importlib import resources
# NOTE: Remove except when compatibility moves beyond Python 3.8
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
import docutils
import docutils.core
from chameleon import PageTemplate
from colorzero import Color
from . import cameras, store, calibrate
from .http import HTTPResponse, parse_formdata, parse_content_value
[docs]
def get_best_family(host, port):
"""
Given a *host* name and a *port* specification (either a number or a
service name), returns the network family (e.g. ``socket.AF_INET``) and
socket address to listen on as a tuple.
"""
try:
infos = socket.getaddrinfo(
host, port,
type=socket.SOCK_STREAM,
flags=socket.AI_PASSIVE)
except socket.gaierror as exc:
raise ValueError('invalid host and port combination') from exc
for family, _, _, _, sockaddr in infos:
return family, sockaddr
raise ValueError('invalid host and port combination')
[docs]
def route(pattern, command='GET'):
"""
Decorator that associates a route with a function
The *pattern* specifies the route that is to be matched. Within the
*pattern*, angle-bracket words designate variable sections of the route
that will be passed to like-named parameters in the associated function.
The *command* (which defaults to 'GET') specifies which HTTP command the
route will match.
The associated function must accept the mandatory "request" parameter
first, which will contain the associated :class:`HTTPRequestHandler`, plus
any parameters implied by ``<sections>`` in the *pattern*. It must return a
:class:`HTTPResponse` object, or :data:`None` if (for some reason) the
route doesn't match.
For example, the following declaration will match the single route
"/index.html"::
@route('/index.html')
def homepage(request):
return HTTPResponse(request, body='Hello, world!')
The following declaration will match any HTML file directly under
``/people``::
@route('/people/<name>.html', 'GET')
def get_person(request, name):
person = db.lookup_person(name)
return HTTPResponse(request, body=f'Hello, {person.name}!')
"""
def decorator(f):
s = re.escape(pattern)
s = re.sub(r'<([A-Za-z_][A-Za-z0-9_]*)>', r'(?P<\1>[^/]+)', s)
pattern_re = re.compile(f'^{s}$')
assert pattern_re not in HTTPRequestHandler.routes
HTTPRequestHandler.routes[(pattern_re, command)] = f
if command == 'GET':
# Anything registered for GET gets automatically associated with
# HEAD as well
HTTPRequestHandler.routes[(pattern_re, 'HEAD')] = f
return f
return decorator
[docs]
class Function(namedtuple('Function', (
'name', 'description', 'function', 'params'))):
"""
Defines an animation function.
.. attribute:: name
The short title of the animation.
.. attribute:: description
An extended description of the animation detailing all the parameters
and the intended result. Typically derived from the function's
doc-string.
.. attribute:: function
The implementing callable function.
.. attribute:: params
A dict mapping each parameter name of :attr:`function` to
:class:`Param` instances (or the special classes like
:class:`ParamFPS`) indicating how to render the controls for each
parameter.
"""
[docs]
class Param(namedtuple('Param', (
'label', 'input_type', 'default', 'min', 'max', 'choices', 'suffix'))):
"""
Defines the associated parameter as being a user-configured value.
The *input_type* is used in the generated ``<input>`` element's "type"
parameter (if this is "select" then a ``<select>`` drop-down element is
generated instead). The *default*, *min*, and *max* parameters correspond
to the "default", "min", and "max" attributes of the ``<input>`` element.
Finally, the *choices* parameter is a mapping of valid identifiers to
labels used when *input_type* is "select".
.. attribute:: label
The content of the ``<label>`` to render with the parameter's
``<input>`` element.
.. attribute:: input_type
The value of the ``<input>`` element's ``type`` parameter.
.. attribute:: default
The default value of the input.
.. attribute:: min
The minimum value for ``range`` type inputs.
.. attribute:: max
The maximum value for ``range`` type inputs.
.. attribute:: choices
A :class:`dict` defining the valid selections for ``select`` type
inputs. The values of the dictionary represent the labels for each
option value.
.. attribute:: suffix
A text suffix to render after the input box.
"""
__slots__ = () # workaround python issue #24931
def __new__(cls, label, input_type, *, default=None, min=None, max=None,
choices=None, suffix=None):
return super(Param, cls).__new__(
cls, label, input_type, default, min, max, choices, suffix)
def value(self, value):
return (
int(value) if self.input_type == 'range' else
float(value) if self.input_type == 'number' else
Color(value) if self.input_type == 'color' else
bool(value) if self.input_type == 'checkbox' else
str(value))
[docs]
class ParamLEDCount:
"""
Defines the associated parameter as taking the total number of LEDs on the
tree (an :class:`int`).
"""
__slots__ = ()
def value(self, request):
return request.server.config.led_count
[docs]
class ParamLEDPositions:
"""
Defines the associated parameter as taking the mapping of LED indexes to
:class:`~blinkenxmas.store.Position` instances.
.. warning::
Please be aware that not all LEDs may be included in the mapping. If an
LED was not detected during calibration (either because it is
persistently hidden or defective) then it will not be included in the
mapping.
"""
__slots__ = ()
def value(self, request):
return request.store.positions
[docs]
class ParamFPS:
"""
Defines the associated parameter as taking the frames-per-second that
animations are expected to be rendered for.
"""
__slots__ = ()
def value(self, request):
return request.server.config.fps
[docs]
def animation(name, **params):
"""
Decorates a function as an animation generator to be presented in the
Create interface. The doc-string of the function is used as the description
and is expected to be in reStructuredText format (which will be rendered
into HTML).
Each of the parameters to the function must be defined as a keyword
argument to the decorator which is associated with one of the parameter
classes defined:
* :class:`ParamLEDCount`
* :class:`ParamLEDPositions`
* :class:`ParamFPS`
* :class:`Param`
"""
def decorator(f):
required_params = {
key: param
for key, param in signature(f).parameters.items()
if param.default is param.empty
}
if required_params.keys() != params.keys():
extra = required_params.keys() - params.keys()
if extra:
raise ValueError(
f'animation function {f!r} has extra parameter(s) '
f'{", ".join(extra)}')
missing = params.keys() - required_params.keys()
if missing:
raise ValueError(
f'animation function {f!r} is missing parameter(s) '
f'{", ".join(missing)}')
invalid_params = {'name', 'data', 'animation'} & params.keys()
if invalid_params:
raise ValueError(
f'invalid parameter name(s) in animation function {f!r}: '
f'{", ".join(invalid_params)}')
if f.__doc__:
overrides = {
'input_encoding': 'unicode',
'doctitle_xform': False,
'initial_header_level': 2,
}
html = docutils.core.publish_parts(
source=dedent(f.__doc__), writer_name='html',
settings_overrides=overrides)['fragment']
else:
html = ''
func = Function(name, html, f, params)
HTTPRequestHandler.animations[f.__name__] = func
return f
return decorator
[docs]
def for_commands(*commands):
"""
Decorator that associates methods in the request handler with HTTP commands
like GET, HEAD, etc. This is only intended for internal use in
:class:`HTTPRequestHandler`.
"""
def decorator(f):
f.commands = commands
if 'GET' in commands:
f.commands += ('HEAD',)
return f
return decorator
[docs]
class HTTPServer(ThreadingHTTPServer):
"""
The blinkenxmas HTTP server class, which descends from
:class:`http.server.ThreadingHTTPServer` and is thus multi-threaded. This
does precious little other than override :meth:`handle_error`.
"""
allow_reuse_address = True
daemon_threads = True
logger = logging.getLogger('httpd')
[docs]
def handle_error(self, request, client_address):
"""
Overridden to shut down the server in the event of an error when the
configuration doesn't specify production mode.
"""
if self.config.production:
super().handle_error(request, client_address)
else:
# We don't print the error with the parent handle_error here as
# we're actually going to re-raise it in the main thread
self.shutdown()
[docs]
class HTTPRequestHandler(BaseHTTPRequestHandler):
"""
The blinkenxmas request handler. The :meth:`get_response` method is the
primary piece of machinery that decides what handler deals with a given
request. There are roughly three types of handler:
* Simple static data (included in the package data) is handled by
:meth:`try_static`
* Chameleon templates (also included in the package data) are rendered by
:meth:`try_template`
* Finally, custom functions (decorated by :func:`route`) are called by
:meth:`try_route`
"""
server_version = f'BlinkenXmas/{version("blinkenxmas")}'
static_path = resources.files('blinkenxmas')
static_modified = dt.datetime.now(dt.timezone.utc)
template_cache = {
'layout.pt': PageTemplate((static_path / 'layout.html.pt').read_text())
}
routes = {}
animations = {}
[docs]
def get_template(self, name):
"""
Returns the Chameleon template with the specified *name*. Templates are
loaded from module resources and cached in the
:attr:`HTTPRequestHandler.template_cache` class attribute.
"""
try:
template = self.template_cache[name]
except KeyError:
template = PageTemplate((self.static_path / name).read_text())
type(self).template_cache[name] = template
return template
[docs]
def get_template_ns(self, **kwargs):
"""
Returns the namespace used when rendering Chameleon templates.
"""
ns = {
'layout': self.template_cache['layout.pt']['layout'],
'url': urllib.parse.quote,
'json': json.dumps,
'request': self,
'messages': self.server.messages,
'config': self.server.config,
'now': dt.datetime.now(tz=dt.timezone.utc),
'datetime': dt.datetime,
'timedelta': dt.timedelta,
'led_count': self.server.config.led_count,
'calibration': self.server.calibration,
'store': self.store,
'animations': self.animations,
}
ns.update(kwargs)
return ns
[docs]
@for_commands('GET', 'POST', 'PUT', 'DELETE')
def try_route(self):
"""
Searches for a match of the requested :attr:`path` in the defined
:mod:`routes`.
If a match is found, the function associated with the route is called
and, if it returns a :class:`HTTPResponse`, that is returned. If no
match is found, returns :data:`None`.
"""
for (pattern, command), handler in self.routes.items():
m = pattern.match(self.path)
if m and command == self.command:
resp = handler(self, **{
param: urllib.parse.unquote(value)
for param, value in m.groupdict().items()
})
if resp is not None:
resp.headers.setdefault('Cache-Control', 'no-cache')
return resp
return None
[docs]
@for_commands('GET')
def try_static(self):
"""
Attempts to find a match for the requested :attr:`path` in the static
data of the module.
If a match is found, a :class:`HTTPResponse` is constructed and
returned. Otherwise, returns :data:`None`.
"""
path = self.path.lstrip('/')
try:
body = (self.static_path / path).open('rb')
except FileNotFoundError:
return None
else:
return HTTPResponse(
self, body=body, filename=path,
headers={'Cache-Control': 'max-age=86400'})
[docs]
@for_commands('GET', 'POST')
def try_template(self):
"""
Attempts to find a Chameleon template within the static data of the
module that matches the requested :attr:`path` plus a ``.pt`` suffix.
If a template is found, it is rendered with the result of
:meth:`get_template_ns` as the namespace, and a :class:`HTTPResponse`
is constructed from the result. Otherwise, returns :data:`None`.
"""
path = self.path.lstrip('/')
template_key = path + '.pt'
try:
template = self.get_template(template_key)
except FileNotFoundError:
return None
namespace = self.get_template_ns()
body = template.render(**namespace)
return HTTPResponse(
self, body=body, last_modified=namespace['now'], etag=False,
filename=path, headers={'Cache-Control': 'no-cache'})
[docs]
def get_response(self):
"""
Tries various methods (:meth:`try_route`, :meth:`try_static`,
:meth:`try_template` in that order) to obtain a :class:`HTTPResponse`
for the current request.
Handles returning appropriate errors in the case of failure (500
internal server error in the case that rendering of a route or template
fails, 404 not found in the case that no route, static file, or
template can be found that matches the requested :attr:`path`).
"""
try:
# Parse the path into its components
self.uri = self.path
parts = urllib.parse.urlsplit(self.uri)
self.path = parts.path
self.fragment = parts.fragment
if self.command == 'GET' and parts.query:
self.query = urllib.parse.parse_qs(parts.query)
elif self.command == 'POST':
try:
content_type, attrs = parse_content_value(
self.headers.get('Content-Type', ''))
if content_type == 'application/x-www-form-urlencoded':
body = self.rfile.read(
int(self.headers['Content-Length'])
if 'Content-Length' in self.headers else
None).decode('utf-8', errors='ignore')
self.query = urllib.parse.parse_qs(body)
elif content_type == 'multipart/form-data':
self.query = parse_formdata(self)
elif content_type == 'application/json':
self.query = self.json()
else:
raise ValueError(
f'unexpected content type: {content_type}')
except (TypeError, ValueError):
return HTTPResponse(self, status_code=HTTPStatus.BAD_REQUEST)
else:
self.query = {}
if isinstance(self.query, dict):
self.query = {
key: value[0]
if isinstance(value, list) and len(value) == 1 else
value
for key, value in self.query.items()
}
# Try various methods to render the path, using the first one that
# successfully returns a response
self.store = store.Storage(self.server.config.db)
for method in (self.try_route, self.try_static, self.try_template):
if self.command in method.commands:
resp = method()
if resp is not None:
return resp
except Exception as exc:
if self.server.config.production:
self.server.handle_error(self, self.client_address)
else:
self.server.exception = exc
raise
return HTTPResponse(self, status_code=HTTPStatus.INTERNAL_SERVER_ERROR)
return HTTPResponse(self, status_code=HTTPStatus.NOT_FOUND)
[docs]
def json(self):
"""
Decode the body of the request as a JSON object. Note this handler can
be called once, and only once, as it reads the request body on-demand.
.. warning::
This method does not check the Content-Type header, and simply
trusts that the request body is a valid JSON object. Wrap in
exception handlers as appropriate!
"""
try:
body_len = int(self.headers['Content-Length'])
except (KeyError, ValueError, TypeError):
body_len = 0
if body_len > 0:
body = self.rfile.read(body_len)
self.headers['Content-Length'] = '0'
else:
raise ValueError('invalid Content-Length for JSON')
return json.loads(body)
[docs]
def do_HEAD(self):
"""
Handle HTTP HEAD requests. See :meth:`get_response` for more
information.
"""
resp = self.get_response()
resp.check_cached()
resp.check_ranges()
with suppress(BrokenPipeError, ConnectionResetError):
resp.send_headers()
[docs]
def do_GET(self):
"""
Handle HTTP GET requests. See :meth:`get_response` for more
information.
"""
resp = self.get_response()
resp.check_cached()
resp.check_ranges()
with suppress(BrokenPipeError, ConnectionResetError):
resp.send_headers()
resp.send_body()
[docs]
def do_DELETE(self):
"""
Handle HTTP DELETE requests. See :meth:`get_response` for more
information.
"""
resp = self.get_response()
with suppress(BrokenPipeError, ConnectionResetError):
resp.send_headers()
resp.send_body()
[docs]
def do_PUT(self):
"""
Handle HTTP PUT requests. See :meth:`get_response` for more
information.
"""
resp = self.get_response()
with suppress(BrokenPipeError, ConnectionResetError):
resp.send_headers()
resp.send_body()
[docs]
def do_POST(self):
"""
Handle HTTP POST requests. See :meth:`get_response` for more
information.
"""
resp = self.get_response()
with suppress(BrokenPipeError, ConnectionResetError):
resp.send_headers()
resp.send_body()
[docs]
class Messages:
"""
This is a trivial class which is used to buffer up to *maxlen* messages,
which are simple strings, for display to the user at some future point.
The :meth:`show` method is used to add a message to the buffer, and
:meth:`drain` to retrieve all messages from the buffer as a list of
strings. Instances of the class are thread-safe and may be used from
multiple threads without additional locking.
"""
def __init__(self, maxlen=20):
self._lock = Lock()
self._items = deque(maxlen=maxlen)
[docs]
def show(self, msg):
"""
Add *msg* to the buffer. If the buffer is already full (has *maxlen*
items in it), the oldest message is discarded and the new message is
appended.
"""
with self._lock:
self._items.append(msg)
[docs]
def drain(self):
"""
Empties the buffer, returning all messages currently stored within it
as a :class:`list`.
"""
with self._lock:
result = list(self._items)
self._items.clear()
return result
[docs]
class HTTPThread(Thread):
"""
The blinkenxmas HTTP thread class wraps an instance of :class:`HTTPServer`
in a :class:`~threading.Thread` for background execution. Instances of this
class may be used as a context manager that will start the thread upon
entry, and stop it (re-raising any exception that occurred during
execution) on exit. This is the recommended method of running this thread.
:param argparse.Namespace config:
The application configuration
:param Messages messages:
A buffer for messages to be relayed to the user
:param queue.Queue queue:
The queue to submit animations to for transmission to the broker
"""
def __init__(self, config, messages, queue):
super().__init__(target=self.serve, daemon=True)
mimetypes.init()
HTTPServer.address_family, addr = get_best_family(
config.httpd_bind, config.httpd_port)
self.httpd = HTTPServer(addr[:2], HTTPRequestHandler)
self.httpd.queue = queue
self.httpd.config = config
self.httpd.messages = messages
self.httpd.camera = {
'none': lambda config: None,
'files': cameras.FilesSource,
'picamera': cameras.PiCameraSource,
'gstreamer': cameras.GStreamerSource,
}[config.camera_type.strip().lower()](config)
self.httpd.calibration = calibrate.Calibration(
config, self.httpd.messages)
self.httpd.exception = None
self._shutdown_needed = False
def __enter__(self):
self.start()
return self
def __exit__(self, *exc_info):
self.stop()
if self.httpd.exception:
raise self.httpd.exception
[docs]
def stop(self):
"""
Stop the HTTP background thread.
"""
if self._shutdown_needed:
self.httpd.shutdown()
[docs]
def serve(self):
"""
The "main" routine of the background thread. Mostly this just calls
:meth:`http.server.HTTPserver.serve_forever`.
"""
try:
host, port = self.httpd.socket.getsockname()[:2]
hostname = socket.gethostname()
self.httpd.logger.warning(f'Serving on {host} port {port}')
self.httpd.logger.warning(f'http://{hostname}:{port}/ ...')
self._shutdown_needed = True
self.httpd.serve_forever()
except Exception as exc:
self.httpd.exception = exc