Source code for tborg.examples.mborg_pygame

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# mborg/examples/mborg_pygame.py
#
__docformat__ = "restructuredtext en"

"""
Joystick Control of MonsterBorg
-------------------------------

by Carl J. Nobile

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import os
import sys
import time
import logging
import pygame

from io import StringIO

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__))))
sys.path.append(BASE_DIR)

from tborg import (
    create_working_dir, ConfigLogger, ThunderBorg, ThunderBorgException)
from daemonize import Daemon

create_working_dir()

from tborg import LOG_PATH, RUN_PATH


[docs] class PYGameController: """ Initializes the attached controller. """ _DEFAULT_CTRL_WAIT = 0.1 _DEFAULT_EVENT_WAIT = 0.0 def __init__(self, logger_name='', log_level=logging.INFO, debug=False): """ Initialize logging and sets the two wait times to a reasonable default. """ self._debug = debug self._clog = logging.getLogger(logger_name) self._clog.setLevel(log_level) self.__controller_initialized = False self.__ctrl_wait_time = 0 self.ctrl_wait_time = self._DEFAULT_CTRL_WAIT self.event_wait_time = self._DEFAULT_EVENT_WAIT self._quit = False @property def ctrl_wait_time(self): """ Property that gets or sets the controller wait time. This wait time is used when looping during the controller detection period. :param sleep: The period of time to sleep between checks. Defaults to 0.1 seconds. :type sleep: float """ return self.__ctrl_wait_time @ctrl_wait_time.setter def ctrl_wait_time(self, sleep): self.__ctrl_wait_time = sleep @property def event_wait_time(self): """ Property that gets or sets the event wait time. This wait time is used when looping during the event processing period. :param sleep: The period of time to sleep between event processing. Defaults to 0.0 seconds. :type sleep: float """ return self.__event_wait_time @event_wait_time.setter def event_wait_time(self, sleep): self.__event_wait_time = sleep @property def is_ctrl_init(self): """ A property that returns `True` or `False` if the controller is initialized. """ return self.__controller_initialized
[docs] def init_controller(self): """ Wait until the controller is connected then initialize pygame. """ pygame.init() while True: try: pygame.joystick.init() except pygame.error as e: self._clog.error("PYGame error: %s", e) if self._quit_sleep(): break except KeyboardInterrupt: self._clog.warn("User aborted with CTRL C.") break else: if pygame.joystick.get_count() < 1: if self._quit_sleep(): break else: self.joystick = pygame.joystick.Joystick(0) self.joystick.init() self._initialize_variables() self._clog.info("Found controller.") self.__controller_initialized = True break
[docs] def _quit_sleep(self): error = False try: pygame.joystick.quit() time.sleep(self.ctrl_wait_time) except KeyboardInterrupt: self._clog.warn("User aborted with CTRL C.") error = True return error
[docs] def _initialize_variables(self): self.axis_data = { axis: 0.0 for axis in range(self.joystick.get_numaxes()) } self.ball_data = { ball: 0.0 for ball in range(self.joystick.get_numballs()) } self.button_data = { but: False for but in range(self.joystick.get_numbuttons()) } self.hat_data = { hat: (0, 0) for hat in range(self.joystick.get_numhats()) } # Buttons Event Types self.SQUARE = 0 self.CROSS = 1 self.CIRCLE = 2 self.TRIANGLE = 3 self.L1 = 4 self.R1 = 5 self.L2 = 6 self.R2 = 7 self.SHARE = 8 self.OPTIONS = 9 self.LJB = 10 self.RJB = 11 self.PSB = 12 self.PADB = 13 # Axis Event Types self.LF_LR = 0 self.LF_UD = 1 self.L2_VR = 2 self.RT_LR = 3 self.RT_UD = 4 # if self.is_ps4() else 3 self.R2_VR = 5 # Create HAT variables. Hat Event Types (HAT0, HAT1, ...) for i in range(len(self.hat_data)): setattr(self, 'HAT' + i, i)
def __set_axis(self, event): self.axis_data[event.axis] = round(event.value, 3) def __set_ball(self, event): self.ball_data[event.ball] = event.rel def __set_button_down(self, event): self.button_data[event.button] = True def __set_button_up(self, event): self.button_data[event.button] = False def __set_hat(self, event): self.hat_data[event.hat] = event.value
[docs] def set_quit(self, event=None): self._quit = True
__METHODS = { pygame.JOYAXISMOTION: __set_axis, pygame.JOYBALLMOTION: __set_ball, pygame.JOYBUTTONDOWN: __set_button_down, pygame.JOYBUTTONUP: __set_button_up, pygame.JOYHATMOTION: __set_hat, pygame.QUIT: set_quit }
[docs] def listen(self): """ Listen to controller events. """ if not self.is_ctrl_init: self._clog.error("The init_ctrl method must be called before " "the listen method.") self.set_quit() while not self._quit: for event in pygame.event.get(): self.__METHODS[event.type](self, event) self.process_event() else: self._clog.warning("Waiting for controller") time.sleep(self.event_wait_time) self._clog.info("Exiting")
[docs] def process_event(self): """ Process the current events. This method needs to be overridden. """ raise NotImplementedError( "Programming error: must implement the process_event method")
[docs] def is_ps4(self): """ Is a PS4 controller attached? .. note:: The current way this is determined may not be reliable, but as of now, it's the best way I have found. """ return len(self.axis_data) == 6
[docs] class JoyStickControl(PYGameController, Daemon): """ This class allows control of the MonsterBorg by a PS3/4 controller. """ _LOG_PATH = os.path.join(LOG_PATH, 'mborg_pygame.log') _BASE_LOGGER_NAME = 'examples' _LOGGER_NAME = 'examples.mborg-pygame' _CTRL_LOGGER_NAME = 'examples.controller' _TBORG_LOGGER_NAME = 'examples.tborg' _PIDFILE = os.path.join(RUN_PATH, 'mborg_pygame.pid') _VOLTAGE_IN = 1.2 * 10 _VOLTAGE_OUT = 12.0 * 0.95 _PROCESS_INTERVAL = 0.00 _MAX_POWER = (1.0 if _VOLTAGE_OUT > _VOLTAGE_IN else _VOLTAGE_OUT / float(_VOLTAGE_IN)) _ROTATE_TURN_SPEED = 0.5 _SLOW_SPEED = 0.5 def __init__(self, bus_num=ThunderBorg.DEFAULT_BUS_NUM, address=ThunderBorg.DEFAULT_I2C_ADDRESS, log_level=logging.INFO, debug=False): self._debug = debug log_level = logging.DEBUG if debug else log_level cl = ConfigLogger() cl.config(logger_name=self._BASE_LOGGER_NAME, file_path=self._LOG_PATH, level=log_level) self._log = logging.getLogger(self._LOGGER_NAME) if not self._debug: self._tb = ThunderBorg(bus_num=bus_num, address=address, logger_name=self._TBORG_LOGGER_NAME, log_level=log_level) PYGameController.__init__(self, logger_name=self._CTRL_LOGGER_NAME, log_level=log_level, debug=debug) Daemon.__init__(self, self._PIDFILE, logger_name=self._LOGGER_NAME, verbose=2 if debug else 0)
[docs] def run(self): """ Start the controller listening process. """ self.init_controller() if not self._debug: # Turn on failsafe. self._tb.set_comms_failsafe(True) if self._tb.get_comms_failsafe(): self.log_battery_monitoring() self.init_mborg() else: self._clog.error("The failsafe mode could not be turned on.") self.set_quit() try: self.listen() except (KeyboardInterrupt, ThunderBorgException) as e: self._log.warn("Exiting event processing, %s", e) except Exception as e: self._log.error("Unknown error, %s", e, exc_info=True) finally: self._tb.halt_motors() self._tb.set_comms_failsafe(False) self._tb.set_led_battery_state(False) self._tb.set_both_leds(0, 0, 0) # Set LEDs off self._log.info("Exiting") sys.exit()
[docs] def log_battery_monitoring(self): """ Dump to the log the initial battery values. """ level_min, level_max = self._tb.get_battery_monitoring_limits() current_level = self._tb.get_battery_voltage() mid_level = (level_min + level_max) / 2 buf = StringIO() buf.write("\nBattery Monitoring Settings\n") buf.write("---------------------------\n") buf.write(f"Minimum (red) {level_min:02.2f} V\n") buf.write(f"Middle (yellow) {mid_level:02.2f} V\n") buf.write(f"Maximum (green) {level_max:02.2f} V\n") buf.write(f"Current Voltage {current_level:02.2f} V\n") self._log.info(buf.getvalue()) buf.close()
[docs] def init_mborg(self): """ Initialize the MonsterBorg joystick controller. """ self._tb.halt_motors() self._tb.set_led_battery_state(False) self._tb.set_both_leds(0, 0, 1) # Set to blue self.event_wait_time = self._PROCESS_INTERVAL if not self.is_ctrl_init: self._log.warn("Could not initialize ") self._tb.set_comms_failsafe(False) self._tb.set_both_leds(0, 0, 0) # Set LEDs off sys.exit() self.set_defaults() self._tb.set_led_battery_state(True) self._led_battery_mode = True self._log.debug("Finished mborg_joy initialization.")
[docs] def process_event(self): """ Process the current events (overrides the base class method). """ # Invert the controller Y axis to match the motor fwd/rev. # If the Y axis needs to be inverted do that also. if self.axis_y_invert: motor_one = motor_two = self.axis_data.get(self.LF_UD) else: motor_one = motor_two = -self.axis_data.get(self.LF_UD) if self.axis_x_invert: x = -self.axis_data.get(self.RT_LR) else: x = self.axis_data.get(self.RT_LR) # Rotate turn button press if not self.button_data.get(self.rotate_turn_button): x *= self.rotate_turn_speed if x > 0.05: motor_one *= 1.0 - (2.0 * x) elif x < -0.05: motor_two *= 1.0 + (2.0 * x) # Drive slow button press if self.button_data.get(self.drive_slow_button): motor_one *= self.drive_slow_speed motor_two *= self.drive_slow_speed if not self._debug: self._tb.set_motor_one(motor_one * self._MAX_POWER) self._tb.set_motor_two(motor_two * self._MAX_POWER) # Set LEDs to purple to indicate motor faults. if (self._tb.get_drive_fault_one() or self._tb.get_drive_fault_two()): if self._led_battery_mode: self._tb.set_led_battery_state(False) self._tb.set_both_leds(1, 0, 1) # Set to purple self._led_battery_mode = False elif not self._led_battery_mode: self._tb.set_led_battery_state(True) self._led_battery_mode = True
[docs] def set_defaults(self, **kwargs): """ Set some default values. This method can be set while running. For example if the robot flips over which could be determined with a sensor the axis invert values can be changed. :param axis_y_invert: If set to `True` the up/down control is inverted. Default is `False`. Can be used if the robot flips over. :type axis_y_invert: bool :param axis_x_invert: If set to `True` the left/right control is inverted. Default is `False`. Can be used if the robot flips over. :type axis_x_invert: bool :param rotate_turn_button: Choose the button for rotation. The default is R1 (5). :type rotate_turn_button: int :param rotate_turn_speed: Choose the speed for rotation. The default is 0.5. :type rotate_turn_speed: float :param drive_slow_button: Choose the button for driving slow. The default is R2 (6). :type drive_slow_but: int :param drive_slow_speed: Choose the speed to decrease to when the drive-slow button is held. :type drive_slow_speed: bool """ tmp_kwargs = kwargs.copy() self.axis_y_invert = tmp_kwargs.pop('axis_y_invert', False) self.axis_x_invert = tmp_kwargs.pop('axis_x_invert', False) self.rotate_turn_button = tmp_kwargs.pop( 'rotate_turn_button', self.R1) self.rotate_turn_speed = tmp_kwargs.pop( 'rotate_turn_speed', self._ROTATE_TURN_SPEED) self.drive_slow_button = tmp_kwargs.pop( 'drive_slow_button', self.L1) self.drive_slow_speed = tmp_kwargs.pop( 'drive_slow_speed', self._SLOW_SPEED) if kwargs: self._log.error("Invalid arguments found: %s", kwargs)
if __name__ == '__main__': # pragma: no cover import argparse parser = argparse.ArgumentParser( description=("JoyStick Control Using PYGame")) parser.add_argument( '-d', '--debug', action='store_true', default=False, dest='debug', help="Run in debug mode (no thunderborg code is run).") parser.add_argument( '-s', '--start', action='store_true', default=False, dest='start', help="Start the daemon.") parser.add_argument( '-r', '--restart', action='store_true', default=False, dest='restart', help="Restart the daemon.") parser.add_argument( '-S', '--stop', action='store_true', default=False, dest='stop', help="Stop the daemon.") options = parser.parse_args() arg_value = (options.start ^ options.restart ^ options.stop) if not arg_value and arg_value is not False: print("Can only set one of 'start', 'restart' or 'stop'.") sys.exit(-1) if options.start: arg = 'start' elif options.restart: arg = 'restart' elif options.stop: arg = 'stop' else: arg = 'start' jsc = JoyStickControl(debug=options.debug) getattr(jsc, arg)()