Source code for blinkenxmas.cameras

import io
import sys
from pathlib import Path
from threading import Lock, Thread, Event, Condition

from PIL import Image


[docs] class AbstractSource: """ An abstract camera source. The *config* is the application configuration object (a :class:`~argparse.Namespace` instance). .. attribute:: frame The current preview frame's data. This is a :class:`bytes` string containing the JPEG data of the frame. .. attribute:: frame_ready A :class:`~threading.Condition` which clients must wait upon to be notified of a new JPEG available in :attr:`frame`. """ def __init__(self, config): self._lock = Lock() self._clients = [] self.frame = b'' self.frame_ready = Condition()
[docs] def start_preview(self, angle): """ Start a live preview from the camera, passing JPEG image frames to the :attr:`frame` attribute. The *angle* is the current angle of the tree. """ raise NotImplementedError
[docs] def stop_preview(self): """ Terminate the live preview from the camera. """ raise NotImplementedError
[docs] def capture(self, angle, led=None): """ Capture a high-quality (highest possible resolution) image of the tree at *angle* with *led* lit full white. Expected to return a file-like object containing the JPEG image data. """ raise NotImplementedError
def _preview_frame(self, frame): with self.frame_ready: self.frame = frame if self.frame: self.frame_ready.notify_all()
[docs] def add_client(self, client): """ Called to add *client* (a :class:`~http.server.BaseHTTPRequestHandler` instance) to the list of clients wanting to receive live preview frames from the camera. """ with self._lock: if not self._clients: angle = int(client.query.get('angle', '0')) self.start_preview(angle) self._clients.append(client)
[docs] def remove_client(self, client): """ Called to remove *client* (a :class:`~http.server.BaseHTTPRequestHandler` instance) from the list of clients wanting to receive live preview frames from the camera. """ with self._lock: try: self._clients.remove(client) except ValueError: pass # already removed if not self._clients: self.stop_preview()
[docs] class FilesSource(AbstractSource): """ This "camera" is primarily intended for testing purposes. It is implemented simple as a list of JPEG files which must conform to the following naming convention: :file:`angle{A}_base.jpg` The base "unlit" image of the tree at the specified angle *n* (in degrees), where *A* is zero-padded to three digits. For example ``angle090_base.jpg``. :file:`angle{A}_led{L}.jpg` The image of the tree at angle *A* (in degrees, zero-padded to three digits) with LED at index *L* (zero-padded to three digits) lit bright white. For example ``angle090_led049.jpg``. """ thread = None lock = Lock() stop = Event() def __init__(self, config): assert config.camera_type == 'files' if not config.camera_path: raise RuntimeError('path must be specified for the "files" camera') super().__init__(config) self._path = config.camera_path self._preview_res = config.camera_preview
[docs] def start_preview(self, angle): with FilesSource.lock: if FilesSource.thread is None: FilesSource.thread = Thread( target=self._preview_thread, args=(angle,), daemon=True) FilesSource.stop.clear() FilesSource.thread.start()
[docs] def stop_preview(self): with FilesSource.lock: if FilesSource.thread is not None: FilesSource.stop.set() FilesSource.thread = None
def _preview_thread(self, angle): base_path = self._path / f'angle{angle:03d}_base.jpg' image = Image.open(base_path).resize(self._preview_res) preview = io.BytesIO() image.save(preview, 'jpeg') while not FilesSource.stop.wait(timeout=0.1): self._preview_frame(preview.getvalue())
[docs] def capture(self, angle, led=None): if FilesSource.thread is not None: raise RuntimeError('Cannot capture while previewing') if led is None: return (self._path / f'angle{angle:03d}_base.jpg').open('rb') else: return ( self._path / f'angle{angle:03d}_led{led:03d}.jpg').open('rb')
[docs] class PiCameraOutput: """ A :mod:`picamera` :ref:`custom output <custom_outputs>` used by :class:`PiCameraSource` to route preview frames to clients. """ def __init__(self, source): self.source = source self.frame = io.BytesIO() def write(self, buf): if buf.startswith(b'\xff\xd8'): size = self.frame.tell() self.frame.seek(0) self.source._preview_frame(self.frame.read(size)) self.frame.seek(0) self.frame.write(buf) def flush(self): pass
[docs] class PiCameraSource(AbstractSource): """ A camera implementation that uses the legacy :mod:`picamera` library. .. warning:: Be warned that this will only work with Raspberry Pi models up to the 4B (specifically *not* the Pi 5), and only with legacy 32-bit versions of RaspiOS or Ubuntu. """ lock = Lock() output = None def __init__(self, config): from picamera import PiCamera assert config.camera_type == 'picamera' super().__init__(config) self._camera = PiCamera(framerate=15) self._camera.rotation = config.camera_rotation self._capture_res = config.camera_capture self._preview_res = config.camera_preview
[docs] def start_preview(self, angle): with PiCameraSource.lock: if PiCameraSource.output is None: PiCameraSource.output = PiCameraOutput(self) self._camera.resolution = self._preview_res self._camera.start_recording( PiCameraSource.output, format='mjpeg')
[docs] def stop_preview(self): with PiCameraSource.lock: if PiCameraSource.output is not None: self._camera.stop_recording() PiCameraSource.output = None
[docs] def capture(self, angle, led=None): self.stop_preview() self._camera.resolution = self._capture_res frame = io.BytesIO() self._camera.capture(frame, format='jpeg') frame.seek(0) return frame
[docs] class GStreamerSource(AbstractSource): """ A camera implementation based on `GStreamer`_. This is primarily intended for use with USB web-cams. However, be warned that most USB web-cams have terrible quality compared to proper camera modules. You are *far* better off using a Pi camera module. .. _GStreamer: https://gstreamer.freedesktop.org/ """ Gst = None GstVideo = None thread = None lock = Lock() stop = Event() def __init__(self, config): if GStreamerSource.Gst is None: import gi gi.require_version('GLib', '2.0') gi.require_version('GObject', '2.0') gi.require_version('Gst', '1.0') gi.require_version('GstVideo', '1.0') from gi.repository import Gst, GstVideo Gst.init(sys.argv) GStreamerSource.Gst = Gst GStreamerSource.GstVideo = GstVideo assert config.camera_type == 'gstreamer' if not config.camera_device: raise RuntimeError( 'device must be specified for the "gstreamer" camera') super().__init__(config) self._device = config.camera_device self._capture_res = config.camera_capture self._preview_res = config.camera_preview self._captured = Event() self._discard = 0
[docs] def start_preview(self, angle): with GStreamerSource.lock: if GStreamerSource.thread is None: GStreamerSource.thread = Thread( target=self._gst_thread, daemon=True) GStreamerSource.stop.clear() GStreamerSource.thread.start()
[docs] def stop_preview(self): with GStreamerSource.lock: if GStreamerSource.thread is not None: GStreamerSource.stop.set() # NOTE: You cannot attempt to join the thread because it's # actually the current thread (but Python doesn't realize that # presumably due to the roundabout way the thread's entered # via a GStreamer callback) GStreamerSource.thread = None
[docs] def capture(self, angle, led=None): self.stop_preview() width, height = self._capture_res pipeline = self.Gst.parse_launch( f""" v4l2src device={self._device} ! capsfilter caps=image/jpeg,width={width},height={height} ! appsink drop=true emit-signals=true name=sink """) if pipeline.set_state(self.Gst.State.PLAYING) == self.Gst.StateChangeReturn.FAILURE: raise RuntimeError('failed to start GStreamer pipeline') data = io.BytesIO() try: sink = pipeline.get_by_name('sink') # Number of frames to initially discard; some cameras require # several "warm-up" frames to measure white-balance, exposure, # and so on so discard a few before capturing self._discard = 10 self._captured.clear() sink.connect('new-sample', self._capture_sample, data) if not self._captured.wait(timeout=30): raise RuntimeError('failed to capture image') data.seek(0) return data finally: pipeline.set_state(self.Gst.State.NULL)
def _capture_sample(self, sink, data): sample = sink.emit('pull-sample') if not isinstance(sample, self.Gst.Sample): return self.Gst.FlowReturn.ERROR buf = sample.get_buffer() self._discard -= 1 if not self._discard: data.write(buf.extract_dup(0, buf.get_size())) self._captured.set() return self.Gst.FlowReturn.OK def _gst_thread(self): width, height = self._preview_res pipeline = self.Gst.parse_launch( f""" v4l2src device={self._device} ! capsfilter caps=image/jpeg,width={width},height={height} ! appsink drop=true emit-signals=true name=sink """) if pipeline.set_state(self.Gst.State.PLAYING) == self.Gst.StateChangeReturn.FAILURE: raise RuntimeError('failed to start GStreamer pipeline') try: sink = pipeline.get_by_name('sink') sink.connect('new-sample', self._preview_sample, None) self.stop.wait() finally: pipeline.set_state(self.Gst.State.NULL) def _preview_sample(self, sink, user_data): sample = sink.emit('pull-sample') if not isinstance(sample, self.Gst.Sample): return self.Gst.FlowReturn.ERROR buf = sample.get_buffer() self._preview_frame(buf.extract_dup(0, buf.get_size())) return self.Gst.FlowReturn.OK