import json
import math as m
import sqlite3
import logging
from collections import namedtuple
from collections.abc import MutableMapping
[docs]
class Position(namedtuple('Position', ('x', 'y', 'z', 'a', 'r'))):
"""
TODO
"""
@classmethod
def from_polar(cls, y, a, r):
x = (r * m.sin(m.radians(a)) + 1) / 2
z = (r * m.cos(m.radians(a)) + 1) / 2
return Position(x, y, z, a, r)
@classmethod
def from_cartesian(cls, x, y, z):
a = m.degrees(m.atan2(z, x) + m.pi)
r = m.hypot(x, z)
return Position(x, y, z, a, r)
@property
def a_r(self):
return m.radians(self.a)
[docs]
class StoragePositions(MutableMapping):
"""
TODO
"""
def __init__(self, connection):
self._conn = connection
def _create_tables(self):
with self._conn:
self._conn.execute(
"DROP TABLE IF EXISTS positions")
self._conn.execute(
"""
CREATE TABLE positions (
led INTEGER NOT NULL,
y NUMBER NOT NULL,
a NUMBER NOT NULL,
r NUMBER NOT NULL,
CONSTRAINT presets_pk PRIMARY KEY (led),
CONSTRAINT coords_ck CHECK (
y BETWEEN 0 AND 1 AND
a BETWEEN 0 AND 360 AND
r BETWEEN 0 AND 1
)
)
""")
def __bool__(self):
sql = "SELECT 1 FROM positions"
for row in self._conn.execute(sql):
return True
return False
def __len__(self):
sql = "SELECT COUNT(*) FROM positions"
for row in self._conn.execute(sql):
return row[0]
return 0
def __iter__(self):
sql = "SELECT led FROM positions ORDER BY led"
for row in self._conn.execute(sql):
yield row['led']
[docs]
def items(self):
sql = "SELECT led, y, a, r FROM positions ORDER BY led"
for row in self._conn.execute(sql):
yield row['led'], Position.from_polar(row['y'], row['a'], row['r'])
def __contains__(self, led):
sql = "SELECT 1 FROM positions WHERE led = ?"
for row in self._conn.execute(sql, (led,)):
return True
return False
def __getitem__(self, led):
sql = "SELECT y, a, r FROM positions WHERE led = ?"
for row in self._conn.execute(sql, (led,)):
return Position.from_polar(row['y'], row['a'], row['r'])
raise KeyError(led)
def __setitem__(self, led, position):
if not isinstance(position, Position):
position = Position.from_cartesian(*position)
sql = (
"""
INSERT INTO positions (led, y, a, r) VALUES (?, ?, ?, ?)
ON CONFLICT (led) DO UPDATE SET y = ?, a = ?, r = ?
""")
with self._conn:
self._conn.execute(sql, (
led, position.y, position.a, position.r,
position.y, position.a, position.r))
def __delitem__(self, led):
sql = "DELETE FROM positions WHERE led = ?"
with self._conn:
cur = self._conn.cursor()
cur.execute(sql, (led,))
if cur.rowcount < 1:
raise KeyError(led)
[docs]
class StoragePresets(MutableMapping):
"""
TODO
"""
def __init__(self, connection):
self._conn = connection
def _create_tables(self):
with self._conn:
self._conn.execute(
"""
CREATE TABLE presets (
name VARCHAR(200) NOT NULL,
data TEXT NOT NULL,
CONSTRAINT presets_pk PRIMARY KEY (name),
CONSTRAINT presets_name_ck CHECK (name <> ''),
CONSTRAINT presets_data_ck CHECK (data <> '')
)
""")
def __bool__(self):
sql = "SELECT 1 FROM presets"
for row in self._conn.execute(sql):
return True
return False
def __len__(self):
sql = "SELECT COUNT(*) FROM presets"
for row in self._conn.execute(sql):
return row[0]
return 0
def __iter__(self):
sql = "SELECT name FROM presets ORDER BY name"
for row in self._conn.execute(sql):
yield row['name']
def __contains__(self, preset):
sql = "SELECT 1 FROM presets WHERE name = ?"
for row in self._conn.execute(sql, (preset,)):
return True
return False
def __getitem__(self, preset):
sql = "SELECT data FROM presets WHERE name = ?"
for row in self._conn.execute(sql, (preset,)):
return json.loads(row['data'])
raise KeyError(preset)
def __setitem__(self, preset, data):
# TODO Assert that the structure is correct (voluptuous? Or just
# decompose it into tables?)
data = json.dumps(data)
sql = (
"""
INSERT INTO presets (name, data) VALUES (?, ?)
ON CONFLICT (name) DO UPDATE SET data = ?
""")
with self._conn:
self._conn.execute(sql, (preset, data, data))
def __delitem__(self, preset):
sql = "DELETE FROM presets WHERE name = ?"
with self._conn:
cur = self._conn.cursor()
cur.execute(sql, (preset,))
if cur.rowcount < 1:
raise KeyError(preset)
[docs]
class Storage:
"""
TODO
"""
schema_version = 3
logger = logging.getLogger('storage')
def __init__(self, db):
if sqlite3.threadsafety < 1:
raise RuntimeError(
'sqlite3 must be compiled with at least basic multi-thread '
'capabilities')
self._conn = sqlite3.connect(db)
self._conn.row_factory = sqlite3.Row
self._presets = StoragePresets(self._conn)
self._positions = StoragePositions(self._conn)
self._create_tables()
def __enter__(self):
return self
def __exit__(self, *exc):
if exc[0] is not None:
self._conn.rollback()
else:
self._conn.commit()
@property
def presets(self):
return self._presets
@property
def positions(self):
return self._positions
@classmethod
def log_message(cls, msg):
cls.logger.warning(msg)
def _create_tables(self):
try:
with self._conn:
for row in self._conn.execute("SELECT version FROM config"):
if row['version'] == Storage.schema_version:
return
elif row['version'] > Storage.schema_version:
raise RuntimeError(
f"Incompatible schema version {row['version']}; "
f"expected {self.schema_version}")
except sqlite3.OperationalError:
# Creation case
Storage.log_message(
f"Creating schema at version {Storage.schema_version}")
with self._conn:
self._conn.execute(
"""
CREATE TABLE config (
version INT NOT NULL,
CONSTRAINT config_pk PRIMARY KEY (version),
CONSTRAINT config_ck CHECK (version >= 1)
)
""")
self._conn.execute(
"INSERT INTO config(version) VALUES (?)",
(Storage.schema_version,))
self._presets._create_tables()
self._positions._create_tables()
return
else:
# Upgrade case
Storage.log_message(
f"Upgrading storage from version {row['version']} to "
f"{Storage.schema_version}")
with self._conn:
self._conn.execute(
"UPDATE config SET version = ?",
(Storage.schema_version,))
self._positions._create_tables()