Source code for tborg.examples.web.monster_web

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Creates a web-page interface for the MonsterBorg.
#
# <IP>:9000/
# or
# <IP>:9000/hold
#
__docformat__ = "restructuredtext en"

import time
import datetime
import mimetypes
import os
import sys
import threading
import cv2
import logging

from socketserver import ThreadingMixIn, TCPServer, BaseRequestHandler
from jinja2 import Environment, FileSystemLoader

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__)))))
sys.path.append(BASE_DIR)

from tborg import create_working_dir, ConfigLogger, ThunderBorg
from daemonize import Daemon


[docs] def is_raspberry_pi(): # Method 1: Check /proc/cpuinfo try: with open('/proc/cpuinfo', 'r') as f: for line in f: if line.startswith('Hardware') or line.startswith('Model'): return 'Raspberry Pi' in line except FileNotFoundError: pass # Method 2: Check device tree (fallback) try: with open('/sys/firmware/devicetree/base/model', 'r') as f: return 'Raspberry Pi' in f.read() except (FileNotFoundError, PermissionError): pass return False
if is_raspberry_pi(): from picamera2 import Picamera2 create_working_dir() from tborg import LOG_PATH, RUN_PATH, MEDIA_PATH
[docs] class Watchdog(threading.Thread): """ Timeout thread """ def __init__(self, tb, log_name, *args, **kwargs): super().__init__(*args, **kwargs) self.tb = tb self._log = logging.getLogger(log_name) self.event = threading.Event() self.terminated = False self.start()
[docs] def run(self): timed_out = True # This method runs in a separate thread while not self.terminated: # Wait for a network event to be flagged for up to one second if timed_out: if self.event.wait(1): # Connection self._log.info("Reconnected...") self.tb.set_led_battery_state(True) timed_out = False self.event.clear() else: if self.event.wait(1): self.event.clear() else: # Timed out self._log.warning("Timed out...") self.tb.set_led_battery_state(False) self.tb.set_both_leds(0, 0, 1) timed_out = True self.tb.halt_motors()
[docs] class StreamProcessor(threading.Thread): """ Image stream processing thread """ def __init__(self, global_data, *args, **kwargs): super().__init__(*args, **kwargs) self.global_data = global_data self.event = threading.Event() self.terminated = False self.start() self.begin = 0
[docs] def run(self): while not self.terminated: if self.event.wait(1): try: frame = self.frame if self.global_data['flipped_camera']: frame = cv2.flip(frame, -1) _, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, self.global_data['jpeg_quality']]) with self.global_data['lock_frame']: self.global_data['last_frame'] = jpeg.tobytes() finally: self.event.clear()
[docs] class ImageCapture(threading.Thread): """ Image capture thread """ def __init__(self, camera, processor, running, log_name, *args, **kwargs): super().__init__(*args, **kwargs) self.camera = camera self.processor = processor self.running = running self._log = logging.getLogger(log_name) self.start()
[docs] def run(self): self._log.info("Start the stream using the video port.") while self.running: # Wait until processor is ready if self.processor.event.is_set(): time.sleep(0.001) continue request = self.camera.capture_request() self.processor.frame = cv2.cvtColor(request.make_array("main"), cv2.COLOR_RGB2BGR) request.release() self.processor.event.set() self._log.info("Terminating camera processing...") self.processor.terminated = True self.processor.join() self._log.info("Processing terminated.")
[docs] class ThreadingTCPServer(ThreadingMixIn, TCPServer): allow_reuse_address = True daemon_threads = True
[docs] class WebServer(BaseRequestHandler): """ Class used to implement the web server """ GLOBAL_DATA = None
[docs] def handle(self): self.tb = self.GLOBAL_DATA['tb'] log_name = self.GLOBAL_DATA['log_name'] lock_frame = self.GLOBAL_DATA['lock_frame'] watchdog = self.GLOBAL_DATA['watchdog'] max_power = self.GLOBAL_DATA['max_power'] req_data = self.parse_request(self.request) self._log = logging.getLogger(log_name) # Get the URL requested self.temp_path = os.path.join(BASE_DIR, 'tborg', 'examples', 'web', 'templates') url_path = '' line = req_data.get('GET') self._log.debug("line: %s", line) if line: parts = line.split(' ') url_path = parts[0] # Serve static files from the templates directory if url_path.startswith('/stream.mjpg'): self._serve_mjpeg_stream() elif url_path.endswith((".js", ".css", ".png", ".ico", ".jpg")): self._log.debug("media: %s", url_path) self._serve_static(url_path) elif url_path.startswith('/halt'): # Turn the drives off self._log.debug("halt: %s", url_path) self._render_template('set.html', title="Halt Motors", percent_left=0, percent_right=0) if self.tb: self.tb.halt_motors() watchdog.event.set() elif url_path.startswith('/set'): # Motor power setting: /set/left/right self._log.debug("set: %s", url_path) parts = url_path.split('/') # Get the power levels if len(parts) >= 4: try: drive_left = float(parts[2]) drive_right = float(parts[3]) except Exception: # Bad values drive_right = 0.0 drive_left = 0.0 else: # Bad request drive_right = 0.0 drive_left = 0.0 # Ensure settings are within limits if drive_right < -1: drive_right = -1 elif drive_right > 1: drive_right = 1 if drive_left < -1: drive_left = -1 elif drive_left > 1: drive_left = 1 # Report the current settings percent_left = drive_left * 100.0 percent_right = drive_right * 100.0 self._render_template("set.html", title="Set Motor", percent_left=percent_left, percent_right=percent_right) # Set the outputs drive_left *= max_power drive_right *= max_power if self.tb: self.tb.set_motor_one(drive_right) self.tb.set_motor_two(drive_left) watchdog.event.set() elif url_path.startswith('/photo'): # Save camera photo self._log.debug("photo: %s", url_path) lock_frame.acquire() capture_frame = self.GLOBAL_DATA['last_frame'] lock_frame.release() if capture_frame is not None: dt = datetime.datetime.now(datetime.UTC) filename = f"photo-{dt.strftime('%Y%m%d-%H%M%S-%f')}.jpg" photo_name = os.path.join(MEDIA_PATH, filename) try: with open(photo_name, 'wb') as f: f.write(capture_frame) msg = f"Photo saved to {photo_name}" except Exception: msg = 'Failed to take photo!' else: msg = 'Failed to take photo!' self._render_template("photo.html", title="Photo", msg=msg) elif url_path == '/' or url_path == "/index.html": # Main page, click buttons to move and to stop self._log.debug("/: %s", url_path) self._render_template("index.html", title="Main Page") elif url_path == '/hold': # Alternate page, hold buttons to move (does not work with # all devices) self._log.debug("hold: %s", url_path) self._render_template("hold.html", title="Hold") elif url_path == '/stream': # Streaming frame, set a delayed refresh self._log.debug("stream: %s", url_path) self._render_template("stream.html", title="Robot Camera") else: self._log.warning("Unknown: %s", url_path) self._send(404, 'Not Found', 'text/html', f'Path : "{url_path}"')
[docs] def parse_request(self, request): """ Create a file-like object from the socket where 'r' mode reads text (bytes are decoded using default encoding, usually utf-8). You can specify encoding explicitly if needed: encoding='utf-8' """ data = {} with request.makefile('r') as f: for line in f: # line includes the \r\n, so strip it clean_line = line.strip() if not clean_line: break # Stop at empty line (common in HTTP) else: idx = clean_line.index(' ') data[clean_line[:idx]] = clean_line[idx+1:] return data
[docs] def _render_template(self, name, **context): env = Environment(loader=FileSystemLoader(self.temp_path)) tmpl = env.get_template(name) body = tmpl.render(**context).encode() self._send(200, 'OK', "text/html", body)
[docs] def _serve_static(self, filename): filename = filename.lstrip("/") filepath = os.path.join(self.temp_path, filename) if os.path.isfile(filepath): mime, _ = mimetypes.guess_type(filepath) mime = mime or "application/octet-stream" self._log.info("filename: %s, mime: %s", filename, mime) with open(filepath, "rb") as f: body = f.read() self._send(200, 'OK', mime, body) else: self._send(404, 'Not Found', 'text/plain', b"Not Found")
[docs] def _serve_mjpeg_stream(self): self.request.sendall( b"HTTP/1.1 200 OK\r\n" b"Content-Type: multipart/x-mixed-replace; boundary=frame\r\n" b"Cache-Control: no-cache\r\n" b"Connection: close\r\n" b"\r\n") try: while True: with self.GLOBAL_DATA['lock_frame']: frame = self.GLOBAL_DATA['last_frame'] if frame is not None: self.request.sendall( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" b"Content-Length: " + str(len(frame)).encode() + b"\r\n\r\n" + frame + b"\r\n") time.sleep(1 / self.GLOBAL_DATA['fps']) # Frames Per Ssecond except (BrokenPipeError, ConnectionResetError): pass
[docs] def _send(self, status, reason, content_type, body: bytes): response = ( f"HTTP/1.1 {status} {reason}\r\n" f"Content-Type: {content_type}\r\n" f"Content-Length: {len(body)}\r\n" f"Connection: close\r\n" f"\r\n").encode() + body self.request.sendall(response)
[docs] class MonsterWeb(Daemon): _LOG_PATH = os.path.join(LOG_PATH, 'monster_web.log') _BASE_LOGGER_NAME = 'examples' _LOGGER_NAME = f'{_BASE_LOGGER_NAME}.monster_web' _TBORG_LOGGER_NAME = 'examples.tborg' _PIDFILE = os.path.join(RUN_PATH, 'monster_web.pid') IMAGE_WIDTH = 240 """ int: Width of the captured image in pixels """ IMAGE_HEIGHT = 192 """ int: Height of the captured image in pixels """ FRAME_RATE = 20 """ int: Number of images to capture per second """ FLIPPED_CAMERA = True """ bool: Swap between True and False if the camera image is rotated by 180. """ JPEG_QUALITY = 80 """ int: JPEG quality level, smaller is faster, higher looks better (0 to 100) """ # These were all global to the module. LAST_FRAME = None LOCK_FRAME = threading.Lock() MAX_POWER = 0 def __init__(self, options, port=9000, address=ThunderBorg.DEFAULT_I2C_ADDRESS, log_level=logging.INFO, *args, **kwargs): super().__init__(self._PIDFILE, logger_name=self._LOGGER_NAME, *args, **kwargs) self.port = port cl = ConfigLogger() cl.config(logger_name=self._LOGGER_NAME, file_path=self._LOG_PATH, level=log_level) self._log = logging.getLogger(self._LOGGER_NAME) self._log.info("Starting MonsterWeb...") self._log.info("Set to level: %s", logging.getLevelName(log_level)) self._borg = options.borg self.camera = None self.processor = None self.watchdog = None self.running = True self.tb = None self._log.info("Options: %s", options) if self._borg: self.tb = ThunderBorg(logger_name=self._TBORG_LOGGER_NAME, address=address, log_level=log_level) voltage_in = float(options.voltage_in) self.tb.set_comms_failsafe(False) self.tb.set_led_battery_state(False) self.tb.set_both_leds(0, 0, 1) # Power settings self.tb.set_battery_limits(voltage_in) # Maximum motor voltage, we limit it to 95% to allow the RPi to get # uninterrupted power voltage_out = self.tb.get_battery_voltage() * 0.95 # Setup the power limits if voltage_out > voltage_in: # This may never happen. self.MAX_POWER = 1.0 else: self.MAX_POWER = voltage_out / voltage_in self.global_data = {'tb': self.tb, 'log_name': self._LOGGER_NAME, 'fps': self.FRAME_RATE, 'flipped_camera': self.FLIPPED_CAMERA, 'jpeg_quality': self.JPEG_QUALITY, 'last_frame': self.LAST_FRAME, 'lock_frame': self.LOCK_FRAME, 'camera': self.camera, 'processor': self.processor, 'watchdog': self.watchdog, 'max_power': self.MAX_POWER, }
[docs] def run(self): try: self.create_image_buffer_frame() except Exception: self._log.error("monster_web.py failed to start.", exc_info=True) sys.exit(1) else: if self._borg: # Turn on failsafe. self._tb.set_comms_failsafe(True) if self._tb.get_comms_failsafe(): # Log and init self.log_battery_monitoring() else: self._log.error("The failsafe mode could not be " "turned on.") self.running = False sys.exit(0)
[docs] def create_image_buffer_frame(self): """ Create the image buffer frame. """ # Startup sequence if self._borg: self.camera = Picamera2() config = self.camera.create_video_configuration( main={"size": (self.IMAGE_WIDTH, self.IMAGE_HEIGHT), "format": "BGR888"}, controls={"FrameDurationLimits": ( int(1_000_000 / self.FRAME_RATE), int(1_000_000 / self.FRAME_RATE))}) self.camera.configure(config) self.camera.start() # Color adaption # 0 Auto # 1 Tungsten # 2 Fluorescent # 3 Indoor # 4 Daylight # 5 Cloudy self.camera.set_controls({"AwbMode": 0}) self._log.info("Setup the stream processing thread") self.processor = StreamProcessor(self.global_data) self._log.info("Wait ...") time.sleep(2) capture_thread = ImageCapture(self.camera, self.processor, self.running, self._LOGGER_NAME) self._log.info("Setup the watchdog") self.watchdog = Watchdog(self.tb, self._LOGGER_NAME) # Run the web server until we are told to close try: WebServer.GLOBAL_DATA = self.global_data http_server = ThreadingTCPServer(("0.0.0.0", self.port), WebServer) http_server.allow_reuse_address = True except Exception as e: # Failed to open the port, report common issues self._log.info(f"\nFailed to open port {self.port}") self._log.info("Other problems include running another script " "with the same port.") self._log.info("If the script was just working recently try " "waiting a minute first.\n") # Flag the script to exit self.running = False raise e else: while self.running: http_server.handle_request() # Turn the motors off under all scenarios if self.options.borg: self.tb.halt_motors() self._log.info("Motors off") # Tell each thread to stop, and wait for them to end if http_server is not None: http_server.server_close() self.running = False capture_thread.join() self.processor.terminated = True self.watchdog.terminated = True self.processor.join() self.watchdog.join() self.CAMERA = None if self.options.borg: self.tb.set_led_battery_state(False) self.tb.set_both_leds(0, 0, 0) self.tb.halt_motors() self._log.info("Web-server terminated.")
if __name__ == "__main__": import argparse import traceback parser = argparse.ArgumentParser( description=("MonsterBorg control using a camera and web interface.")) parser.add_argument( '-b', '--borg', action='store_false', default=True, dest='borg', help="The ThunderBorg code is not run.") parser.add_argument( '-d', '--debug', action='store_true', default=False, dest='debug', help="Run in debug mode (no thunderborg code is run).") parser.add_argument( '-l', '--log-level', action='store_true', default=False, dest='level', help="Set the log level to debug.") parser.add_argument( '-v', '--voltage-in', type=float, default=12, dest='voltage_in', help=("The total voltage from the battery source, defaults to 12. " "If set to 0 (zero) the voltage is auto detected.")) parser.add_argument( '-s', '--start', action='store_true', default=False, dest='start', help="Start the daemon.") parser.add_argument( '-r', '--restart', action='store_true', default=False, dest='restart', help="Restart the daemon.") parser.add_argument( '-S', '--stop', action='store_true', default=False, dest='stop', help="Stop the daemon.") options = parser.parse_args() arg_value = (options.start ^ options.restart ^ options.stop) if not arg_value and arg_value is not False: print("Can only set one of 'start', 'restart' or 'stop'.") sys.exit(-1) if options.start: arg = 'start' elif options.restart: arg = 'restart' elif options.stop: arg = 'stop' else: arg = 'start' ret = 0 if options.level: log_level=logging.DEBUG else: log_level=logging.INFO try: mw = MonsterWeb(options, log_level=log_level) except Exception: tb = sys.exc_info()[2] traceback.print_tb(tb) print(f"{sys.exc_info()[0]}: {sys.exc_info()[1]}\n") ret = 1 else: getattr(mw, arg)() sys.exit(ret)