Source code for blinkenxmas.config

import os
import socket
from pathlib import Path
from contextlib import suppress
from fnmatch import fnmatchcase
from itertools import accumulate, chain
from argparse import ArgumentParser, SUPPRESS
from configparser import ConfigParser

# NOTE: The fallback comes first here as Python 3.7 incorporates
# importlib.resources but at a version incompatible with our requirements.
# Ultimately the try clause should be removed in favour of the except clause
# once compatibility moves beyond Python 3.9
try:
    import importlib_resources as resources
except ImportError:
    from importlib import resources

# NOTE: Remove except when compatibility moves beyond Python 3.8
try:
    from importlib.metadata import version
except ImportError:
    from importlib_metadata import version


# The locations to attempt to read the configuration from
XDG_CONFIG_HOME = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config'))
CONFIG_LOCATIONS = (
    Path('/usr/local/etc/blinkenxmas.conf'),
    Path('/etc/blinkenxmas.conf'),
    Path(XDG_CONFIG_HOME / 'blinkenxmas.conf'),
)

# The set of keys within the configuration that represent paths and thus need
# to be resolved relative to the configuration file they were read from
CONFIG_PATHS = {
    # section,  key
    ('web',    'database'),
    ('camera', 'path'),
    ('camera', 'device'),
}


[docs] class ConfigArgumentParser(ArgumentParser): """ A variant of :class:`~argparse.ArgumentParser` that links arguments to specified keys in a :class:`~configparser.ConfigParser` instance. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._config_map = {}
[docs] def add_argument(self, *args, section=None, key=None, **kwargs): """ Adds *section* and *key* parameters. These link the new argument to the specified configuration entry. The default for the argument can be specified directly as usual, or can be read from the configuration (see :meth:`set_defaults`). When arguments are parsed, the value assigned to this argument will be copied to the associated configuration entry. """ return self._add_config_action( *args, method=super().add_argument, section=section, key=key, **kwargs)
[docs] def add_argument_group(self, title=None, description=None, section=None): """ Adds a new argument group object and returns it. The new argument group will likewise accept *section* and *key* parameters on its :meth:`add_argument` method. The *section* parameter will default to the value of the *section* parameter passed to this method (but may be explicitly overridden). """ group = super().add_argument_group(title=title, description=description) def add_argument(*args, section=section, key=None, _add_arg=group.add_argument, **kwargs): return self._add_config_action( *args, method=_add_arg, section=section, key=key, **kwargs) group.add_argument = add_argument return group
def _add_config_action(self, *args, method, section, key, **kwargs): assert callable(method), 'method must be a callable' if (section is None) != (key is None): raise ValueError('section and key must be specified together') try: if kwargs['action'] in ('store_true', 'store_false'): type = boolean except KeyError: type = kwargs.get('type', str) action = method(*args, **kwargs) if key is not None: with suppress(KeyError): if self._config_map[action.dest] != (section, key, type): raise ValueError( 'section and key must match for all equivalent dest ' 'values') self._config_map[action.dest] = (section, key, type) return action
[docs] def set_defaults_from(self, config): """ Sets defaults for all arguments from their associated configuration entries in *config*. """ kwargs = { dest: config.getboolean(section, key) if type is boolean else config[section][key] for dest, (section, key, type) in self._config_map.items() if section in config and key in config[section] } return super().set_defaults(**kwargs)
[docs] def update_config(self, config, namespace): """ Copy values from *namespace* (presumably the result of calling something like :meth:`~argparse.ArgumentParser.parse_args`) to *config*. Note that namespace values will be converted to :class:`str` implicitly. """ for dest, (section, key, type) in self._config_map.items(): config[section][key] = str(getattr(namespace, dest))
[docs] def of_type(self, type): """ Return a set of (section, key) tuples listing all configuration items which were defined as being of the specified *type* (with the *type* keyword passed to :meth:`add_argument`. """ return { (section, key) for section, key, item_type in self._config_map.values() if item_type is type }
[docs] def port(s): """ Convert the :class:`str` *s* into a port number. *s* may contain an integer representation (in which case the conversion is trivial), or a :class:`str` containing a registered port name, in which case ``getservbyname`` will be used to convert it to a port number (usually via NSS). """ try: return int(s) except ValueError: try: return socket.getservbyname(s) except OSError: raise ValueError(f'invalid service name or port number: {s}')
[docs] def boolean(s): """ Convert the string *s* to a :class:`bool`. A typical set of case insensitive strings are accepted: "yes", "y", "true", "t", and "1" are converted to :data:`True`, while "no", "n", "false", "f", and "0" convert to :data:`False`. Other values will result in :exc:`ValueError`. """ try: return { 'n': False, 'no': False, 'f': False, 'false': False, '0': False, 'y': True, 'yes': True, 't': True, 'true': True, '1': True, }[str(s).strip().lower()] except KeyError: raise ValueError(f'invalid boolean value: {s}')
[docs] def resolution(s): """ Convert the string *s* into a tuple of (width, height). """ width, height = (int(i) for i in s.lower().split('x', 1)) return width, height
[docs] def rotation(s): """ Convert the string *s* into a rotation in degrees. Values which are not multiples of 90 are rejected with :exc:`ValueError`. """ r = int(s) % 360 if r not in (0, 90, 180, 270): raise ValueError(f'invalid rotation {s}; must be multiple of 90') return r
[docs] def strips(s): """ Convert the :class:`str` *s*, which must contain a comma-separated list of integers, into an iterable of :class:`range` objects, each containing a number of elements specified by the input string. For example: >>> list(strips('1,2,3')) [range(0, 2), range(1, 3), range(3, 6)] >>> list(strips('50,100')) [range(0, 50), range(50, 150)] """ start = 0 for count in map(int, s.split(',')): yield range(start, start + count) start += count
[docs] def get_parser(config, **kwargs): """ Given *config*, a :class:`~configparser.ConfigParser` containing the stored application configuration (presumably returned by :func:`get_config`), and any keyword arguments that should be passed to the argument parser, constructions and returns a :class:`ConfigArgumentParser` instance with the command line parameters common to all the applications. """ parser = ConfigArgumentParser(**kwargs) parser.add_argument( '--version', action='version', version=version('blinkenxmas')) mqtt_section = parser.add_argument_group('mqtt', section='mqtt') mqtt_section.add_argument( '--broker-address', key='host', metavar='ADDR', help="the address on which to find the MQTT broker. Default: " "%(default)s") mqtt_section.add_argument( '--broker-port', key='port', type=port, metavar='NUM', help="the port on which to find the MQTT broker. Default: " "%(default)s") mqtt_section.add_argument( '--topic', key='topic', help="the topic on which the Pico W is listening for messages. " "Default: %(default)s") # Internal use arguments led_sections = [s for s in config if s.startswith('leds:')] strip_counts = [int(config[s]['count']) for s in led_sections] parser.add_argument( '--led-strips', metavar='NUM,NUM,...', type=strips, default=[ range(start, start + count) for start, count in zip(accumulate(chain([0], strip_counts)), strip_counts) ], help=SUPPRESS) parser.add_argument( '--led-count', metavar='NUM', type=int, default=sum(strip_counts, 0), help=SUPPRESS) parser.add_argument( '--fps', metavar='NUM', type=int, default=min(( int(config[section].get('fps', 60)) for section in config if section.startswith('leds:') ), default=60), help=SUPPRESS) return parser
[docs] def get_config(): """ Load the default configuration from the project resources, defining the valid sections and keys from the default (amalgamating the example leds sections into a template "leds:\\*" section). Returns a :class:`~configparser.ConfigParser` instance with the stored configuration loaded. """ config = ConfigParser( delimiters=('=',), empty_lines_in_values=False, interpolation=None, converters={'list': lambda s: s.strip().splitlines()}, strict=False) with resources.path('blinkenxmas', 'default.conf') as default_conf: config.read(default_conf) valid = {config.default_section: set()} for section, keys in config.items(): for key in keys: valid.setdefault( 'leds:*' if section.startswith('leds:') else section, set() ).add(key) for section in {s for s in config if s.startswith('leds:')}: del config[section] # Attempt to load each of the pre-defined locations for the "main" # configuration, validating sections and keys against the default template # loaded above for path in CONFIG_LOCATIONS: path = path.expanduser() config.read(path) for section, keys in config.items(): try: section = {s for s in valid if fnmatchcase(section, s)}.pop() except KeyError: raise ValueError( f'{path}: invalid section [{section}]') for key in set(keys) - valid[section]: raise ValueError( f'{path}: invalid key {key} in [{section}]') # Resolve paths relative to the configuration file just loaded for section, key in CONFIG_PATHS: if key in config[section]: value = Path(config[section][key]).expanduser() if not value.is_absolute(): value = (path.parent / value).resolve() config[section][key] = str(value) return config
def get_pico_config(config): """ Given *config*, a :class:`~argparse.Namespace` instance containing the active application configuration, returns a :class:`str` containing the MicroPython code for the config.py module on the Pico. """ leds = [ ( config[section]['driver'], int(config[section]['count']), int(config[section].get('reversed', 'no') == 'yes'), config[section]['order'], ) + ( (int(config[section]['pin']),) if config[section]['driver'] == 'WS2812' else (int(config[section]['clk']), int(config[section]['dat'])) ) for section in config if section.startswith('leds:') ] return f"""\ from mqtt_as import config # WiFi configuration config['ssid'] = {config['wifi']['ssid']!r} config['wifi_pw'] = {config['wifi']['password']!r} # MQTT broker configuration config['server'] = {config['mqtt']['host']!r} config['topic'] = {config['mqtt']['topic']!r} # Configuration of the LEDs config['fps'] = { min(int(config[section].get('fps', 60)) for section in config if section.startswith('leds:')) } config['leds'] = {leds!r} # Error handling and status reporting config['status'] = { int(config['pico']['status']) if config['pico'].get('status', 'LED').isdigit() else config['pico']['status'] !r} config['error'] = {config['pico']['error']!r} """