diff --git a/BabbleApp/algo_settings_widget.py b/BabbleApp/algo_settings_widget.py index deb0671..efbc9bf 100644 --- a/BabbleApp/algo_settings_widget.py +++ b/BabbleApp/algo_settings_widget.py @@ -4,7 +4,12 @@ from queue import Queue from threading import Event import re -from utils.misc_utils import bg_color_highlight, bg_color_clear, is_valid_float_input, is_valid_int_input +from utils.misc_utils import ( + bg_color_highlight, + bg_color_clear, + is_valid_float_input, + is_valid_int_input, +) from lang_manager import LocaleStringManager as lang diff --git a/BabbleApp/babble_processor.py b/BabbleApp/babble_processor.py index 6b3a595..ad6d1ad 100644 --- a/BabbleApp/babble_processor.py +++ b/BabbleApp/babble_processor.py @@ -98,7 +98,9 @@ def __init__( self.opts.inter_op_num_threads = 1 self.opts.intra_op_num_threads = settings.gui_inference_threads self.opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL - self.opts.add_session_config_entry("session.intra_op.allow_spinning", "0") # ~3% savings worth ~6ms avg latency. Not noticeable at 60fps? + self.opts.add_session_config_entry( + "session.intra_op.allow_spinning", "0" + ) # ~3% savings worth ~6ms avg latency. Not noticeable at 60fps? self.opts.enable_mem_pattern = False if self.runtime in ("ONNX", "Default (ONNX)"): # ONNX if self.use_gpu: @@ -111,13 +113,13 @@ def __init__( self.opts, providers=provider, ) - except: # Load default model if we can't find the specified model + except: # Load default model if we can't find the specified model print( - f'\033[91m[{lang._instance.get_string("log.error")}] {lang._instance.get_string("error.modelLoad")} {self.model}\033[0m' + f'\033[91m[{lang._instance.get_string("log.error")}] {lang._instance.get_string("error.modelLoad")} {self.model}\033[0m' ) - print(f'\033[91mLoading Default model: {self.default_model}.\033[0m') + print(f"\033[91mLoading Default model: {self.default_model}.\033[0m") self.sess = ort.InferenceSession( - f"{self.default_model}/onnx/model.onnx", + f"{self.default_model}/onnx/model.onnx", self.opts, providers=[provider], provider_options=[{"device_id": self.gpu_index}], @@ -147,7 +149,7 @@ def output_images_and_update(self, output_information: CamInfo): self.image_queue_outgoing.put((image_stack, output_information)) if self.image_queue_outgoing.qsize() > 1: self.image_queue_outgoing.get() - + self.previous_image = self.current_image self.previous_rotation = self.config.rotation_angle @@ -272,7 +274,7 @@ def run(self): # else: # pass # print(self.output) - + self.output_images_and_update(CamInfo(self.current_algo, self.output)) def get_framesize(self): diff --git a/BabbleApp/babbleapp.py b/BabbleApp/babbleapp.py index c445bdf..01f6c2e 100644 --- a/BabbleApp/babbleapp.py +++ b/BabbleApp/babbleapp.py @@ -41,6 +41,7 @@ if os_type == "Windows": try: from ctypes import windll, c_int + winmm = windll.winmm except OSError: print( @@ -172,7 +173,7 @@ async def async_main(): # Get Configuration config: BabbleConfig = BabbleConfig.load() - + # Init logging. TODO: Initiate before "BabbleConfig.load()"? if config.settings.gui_logging: setup_logging() diff --git a/BabbleApp/calib_settings_widget.py b/BabbleApp/calib_settings_widget.py index d62c91d..35600fb 100644 --- a/BabbleApp/calib_settings_widget.py +++ b/BabbleApp/calib_settings_widget.py @@ -253,11 +253,13 @@ def render(self, window, event, values): for count1, element1 in enumerate(self.shape): for count2, element2 in enumerate(element1): - if values[element2] != "": + if values[element2] != "": value = values[element2] - if is_valid_float_input(value): # Returns true if a single decimal point. Therefore we need to make sure value can be converted to a float by assuming a dot implies a leading 0. + if is_valid_float_input( + value + ): # Returns true if a single decimal point. Therefore we need to make sure value can be converted to a float by assuming a dot implies a leading 0. if value == ".": - valid_float = 0. + valid_float = 0.0 values[element2] = valid_float window[element2].update(valid_float) value = float(values[element2]) @@ -266,10 +268,12 @@ def render(self, window, event, values): changed = True else: trimmed_value = value[:-1] - if trimmed_value == '': # If we get an empty string, don't try to convert to float. + if ( + trimmed_value == "" + ): # If we get an empty string, don't try to convert to float. window[element2].update(trimmed_value) values[element2] = trimmed_value - else: + else: value = float(trimmed_value) window[element2].update(value) values[element2] = value @@ -280,7 +284,7 @@ def render(self, window, event, values): self.array[0][count2] = float(0) changed = True self.refreshed = False - + elif event == self.gui_reset_max: for count1, element1 in enumerate(self.shape): for count2, element2 in enumerate(element1): diff --git a/BabbleApp/camera.py b/BabbleApp/camera.py index 0f02fdc..e9d6cfc 100644 --- a/BabbleApp/camera.py +++ b/BabbleApp/camera.py @@ -31,7 +31,7 @@ # packet (packet-size bytes) ETVR_HEADER = b"\xff\xa0\xff\xa1" ETVR_HEADER_LEN = 6 -PORTS = ("COM", "/dev/ttyACM") +PORTS = ("COM", "/dev/ttyACM", "/dev/tty.usbmodem", "/dev/cu.usbmodem") class CameraState(Enum): @@ -127,7 +127,9 @@ def run(self): return try: # Only create the camera once, reuse it - self.vft_camera = FTCameraController(get_camera_index_by_name(self.config.capture_source)) + self.vft_camera = FTCameraController( + get_camera_index_by_name(self.config.capture_source) + ) self.vft_camera.open() should_push = False except Exception: @@ -136,79 +138,102 @@ def run(self): self.vft_camera.close() else: # If the camera is already open it don't spam it!! - if (not self.vft_camera.is_open): + if not self.vft_camera.is_open: self.vft_camera.open() should_push = False elif ( - self.cv2_camera is None - or not self.cv2_camera.isOpened() - or self.camera_status == CameraState.DISCONNECTED - or get_camera_index_by_name(self.config.capture_source) != self.current_capture_source - ): - if self.vft_camera is not None: - self.vft_camera.close() - self.device_is_vft = False + self.cv2_camera is None + or not self.cv2_camera.isOpened() + or self.camera_status == CameraState.DISCONNECTED + or get_camera_index_by_name(self.config.capture_source) + != self.current_capture_source + ): + if self.vft_camera is not None: + self.vft_camera.close() + self.device_is_vft = False - print(self.error_message.format(self.config.capture_source)) - # This requires a wait, otherwise we can error and possible screw up the camera - # firmware. Fickle things. - if self.cancellation_event.wait(WAIT_TIME): - return - if self.config.capture_source not in self.camera_list: - if "http://" in str(self.config.capture_source): self.http=True - else: self.http=False - self.current_capture_source = self.config.capture_source + print(self.error_message.format(self.config.capture_source)) + # This requires a wait, otherwise we can error and possible screw up the camera + # firmware. Fickle things. + if self.cancellation_event.wait(WAIT_TIME): + return + if self.config.capture_source not in self.camera_list: + if "http://" in str(self.config.capture_source): + self.http = True else: - self.current_capture_source = get_camera_index_by_name(self.config.capture_source) + self.http = False + self.current_capture_source = self.config.capture_source + else: + self.current_capture_source = get_camera_index_by_name( + self.config.capture_source + ) - if self.config.use_ffmpeg: - self.cv2_camera = cv2.VideoCapture( - self.current_capture_source, cv2.CAP_FFMPEG - ) + print(self.error_message.format(self.config.capture_source)) + # This requires a wait, otherwise we can error and possible screw up the camera + # firmware. Fickle things. + if self.cancellation_event.wait(WAIT_TIME): + return + if self.config.capture_source not in self.camera_list: + if "http://" in str(self.config.capture_source): + self.http = True else: - if not self.http: - self.cv2_camera = cv2.VideoCapture() - self.cv2_camera.open(self.current_capture_source) - else: - self.cv2_camera = MJPEGVideoCapture(self.current_capture_source) - self.cv2_camera.open() + self.http = False + self.current_capture_source = self.config.capture_source + else: + self.current_capture_source = get_camera_index_by_name( + self.config.capture_source + ) + + if self.config.use_ffmpeg: + self.cv2_camera = cv2.VideoCapture( + self.current_capture_source, cv2.CAP_FFMPEG + ) + else: if not self.http: - if not self.settings.gui_cam_resolution_x == 0: - self.cv2_camera.set( - cv2.CAP_PROP_FRAME_WIDTH, - self.settings.gui_cam_resolution_x, - ) - if not self.settings.gui_cam_resolution_y == 0: - self.cv2_camera.set( - cv2.CAP_PROP_FRAME_HEIGHT, - self.settings.gui_cam_resolution_y, - ) - if not self.settings.gui_cam_framerate == 0: - self.cv2_camera.set( - cv2.CAP_PROP_FPS, self.settings.gui_cam_framerate - ) - should_push = False - else: - # We don't have a capture source to try yet, wait for one to show up in the GUI. - if self.vft_camera is not None: - self.vft_camera.close() - if self.cancellation_event.wait(WAIT_TIME): - self.camera_status = CameraState.DISCONNECTED - return - # Assuming we can access our capture source, wait for another thread to request a capture. - # Cycle every so often to see if our cancellation token has fired. This basically uses a - # python event as a context-less, resettable one-shot channel. - if should_push and not self.capture_event.wait(timeout=0.02): - continue - if self.config.capture_source is not None: - if isSerial: - self.get_serial_camera_picture(should_push) + self.cv2_camera = cv2.VideoCapture() + self.cv2_camera.open(self.current_capture_source) + else: + self.cv2_camera = MJPEGVideoCapture( + self.current_capture_source + ) + self.cv2_camera.open() + if not self.http: + if not self.settings.gui_cam_resolution_x == 0: + self.cv2_camera.set( + cv2.CAP_PROP_FRAME_WIDTH, + self.settings.gui_cam_resolution_x, + ) + if not self.settings.gui_cam_resolution_y == 0: + self.cv2_camera.set( + cv2.CAP_PROP_FRAME_HEIGHT, + self.settings.gui_cam_resolution_y, + ) + if not self.settings.gui_cam_framerate == 0: + self.cv2_camera.set( + cv2.CAP_PROP_FPS, self.settings.gui_cam_framerate + ) + should_push = False else: - self.__del__() - self.get_camera_picture(should_push) - if not should_push: - # if we get all the way down here, consider ourselves connected - self.camera_status = CameraState.CONNECTED + # We don't have a capture source to try yet, wait for one to show up in the GUI. + if self.vft_camera is not None: + self.vft_camera.close() + if self.cancellation_event.wait(WAIT_TIME): + self.camera_status = CameraState.DISCONNECTED + return + # Assuming we can access our capture source, wait for another thread to request a capture. + # Cycle every so often to see if our cancellation token has fired. This basically uses a + # python event as a context-less, resettable one-shot channel. + if should_push and not self.capture_event.wait(timeout=0.02): + continue + if self.config.capture_source is not None: + if isSerial: + self.get_serial_camera_picture(should_push) + else: + self.__del__() + self.get_camera_picture(should_push) + if not should_push: + # if we get all the way down here, consider ourselves connected + self.camera_status = CameraState.CONNECTED def get_camera_picture(self, should_push): try: @@ -220,7 +245,9 @@ def get_camera_picture(self, should_push): return self.frame_number = self.frame_number + 1 elif self.cv2_camera is not None and self.cv2_camera.isOpened(): - ret, image = self.cv2_camera.read() # MJPEG Stream reconnects are currently limited by the hard coded 30 second timeout time on VideoCapture.read(). We can get around this by recompiling OpenCV or using a custom MJPEG stream imp. + ret, image = ( + self.cv2_camera.read() + ) # MJPEG Stream reconnects are currently limited by the hard coded 30 second timeout time on VideoCapture.read(). We can get around this by recompiling OpenCV or using a custom MJPEG stream imp. if ret and image is not None: if not ret: if not self.http: @@ -232,7 +259,9 @@ def get_camera_picture(self, should_push): return self.FRAME_SIZE = image.shape # Calculate FPS - current_frame_time = time.time() # Should be using "time.perf_counter()", not worth ~3x cycles? + current_frame_time = ( + time.time() + ) # Should be using "time.perf_counter()", not worth ~3x cycles? delta_time = current_frame_time - self.last_frame_time self.last_frame_time = current_frame_time current_fps = 1 / delta_time if delta_time > 0 else 0 @@ -267,7 +296,7 @@ def get_next_packet_bounds(self): def get_next_jpeg_frame(self): beg, end = self.get_next_packet_bounds() - jpeg = self.buffer[beg: end + 2] + jpeg = self.buffer[beg : end + 2] self.buffer = self.buffer[end + 2 :] return jpeg @@ -289,7 +318,9 @@ def get_serial_camera_picture(self, should_push): ) return # Calculate FPS - current_frame_time = time.time() # Should be using "time.perf_counter()", not worth ~3x cycles? + current_frame_time = ( + time.time() + ) # Should be using "time.perf_counter()", not worth ~3x cycles? delta_time = current_frame_time - self.last_frame_time self.last_frame_time = current_frame_time current_fps = 1 / delta_time if delta_time > 0 else 0 @@ -299,11 +330,14 @@ def get_serial_camera_picture(self, should_push): if should_push: self.push_image_to_queue(image, int(current_fps), self.fps) + # Discard the serial buffer. This is due to the fact that it, # may build up some outdated frames. A bit of a workaround here tbh. # Do this at the end to give buffer time to refill. if conn.in_waiting >= BUFFER_SIZE: - print(f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.discardingSerial")} ({conn.in_waiting} bytes){Fore.RESET}') + print( + f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.discardingSerial")} ({conn.in_waiting} bytes){Fore.RESET}' + ) conn.reset_input_buffer() self.buffer = b"" @@ -323,22 +357,36 @@ def start_serial_connection(self, port): # Otherwise, close the connection before trying to reopen. self.serial_connection.close() com_ports = [tuple(p) for p in list(serial.tools.list_ports.comports())] + + # evil macOS hack + if "/dev/tty.usbmodem" in port: + port = port.replace("/dev/tty.usbmodem", "/dev/cu.usbmodem") + # Do not try connecting if no such port i.e. device was unplugged. if not any(p for p in com_ports if port in p): return True + try: - rate = 115200 if sys.platform == "darwin" else 3000000 # Higher baud rate not working on macOS - conn = serial.Serial(baudrate=rate, port=port, xonxoff=False, dsrdtr=False, rtscts=False) + rate = ( + 115200 if sys.platform == "darwin" else 3000000 + ) # Higher baud rate not working on macOS + conn = serial.Serial( + baudrate=rate, port=port, xonxoff=False, dsrdtr=False, rtscts=False + ) # Set explicit buffer size for serial. This function is Windows only! - if os_type == 'Windows': + if os_type == "Windows": conn.set_buffer_size(rx_size=BUFFER_SIZE, tx_size=BUFFER_SIZE) - print(f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.ETVRConnected")} {port}{Fore.RESET}') + print( + f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.ETVRConnected")} {port}{Fore.RESET}' + ) self.serial_connection = conn self.camera_status = CameraState.CONNECTED return False except Exception as e: - print(f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.ETVRFailiure")} {port}{Fore.RESET}') + print( + f'{Fore.CYAN}[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.ETVRFailiure")} {port}{Fore.RESET}' + ) print(e) self.camera_status = CameraState.DISCONNECTED return True @@ -347,14 +395,14 @@ def clamp_max_res(self, image: MatLike) -> MatLike: shape = image.shape max_value = np.max(shape) if max_value > MAX_RESOLUTION: - scale: float = MAX_RESOLUTION/max_value + scale: float = MAX_RESOLUTION / max_value width: int = int(shape[1] * scale) height: int = int(shape[0] * scale) image = cv2.resize(image, (width, height)) return image - else: return image - + else: + return image def push_image_to_queue(self, image, frame_number, fps): # If there's backpressure, just yell. We really shouldn't have this unless we start getting diff --git a/BabbleApp/camera_widget.py b/BabbleApp/camera_widget.py index ae59781..1c83932 100644 --- a/BabbleApp/camera_widget.py +++ b/BabbleApp/camera_widget.py @@ -1,23 +1,21 @@ -from collections import deque from queue import Queue, Empty from threading import Event, Thread import FreeSimpleGUI as sg import cv2 import os -from babble_processor import BabbleProcessor, CamInfoOrigin +from babble_processor import BabbleProcessor from camera import Camera, CameraState, MAX_RESOLUTION from config import BabbleConfig from osc import Tab from utils.misc_utils import ( playSound, list_camera_names, - get_camera_index_by_name, bg_color_highlight, - bg_color_clear, - is_valid_int_input + is_valid_int_input, ) from lang_manager import LocaleStringManager as lang + class CameraWidget: def __init__(self, widget_id: Tab, main_config: BabbleConfig, osc_queue: Queue): self.gui_camera_addr = f"-CAMERAADDR{widget_id}-" @@ -62,7 +60,9 @@ def __init__(self, widget_id: Tab, main_config: BabbleConfig, osc_queue: Queue): self.capture_event = Event() self.capture_queue = Queue(maxsize=2) self.roi_queue = Queue(maxsize=2) - self.image_queue = Queue(maxsize=500) # This is needed to prevent the UI from freezing during widget changes. + self.image_queue = Queue( + maxsize=500 + ) # This is needed to prevent the UI from freezing during widget changes. self.babble_cnn = BabbleProcessor( self.config, @@ -212,7 +212,7 @@ def __init__(self, widget_id: Tab, main_config: BabbleConfig, osc_queue: Queue): key=self.gui_camera_addr, tooltip=lang._instance.get_string("camera.cameraAddressTooltip"), enable_events=True, - size=(20,0), + size=(20, 0), ), sg.Button( lang._instance.get_string("camera.refreshCameraList"), @@ -286,7 +286,6 @@ def start(self): self.camera_thread = Thread(target=self.camera.run) self.camera_thread.start() - def stop(self): # If we're not running yet, bail if self.cancellation_event.is_set(): @@ -318,7 +317,7 @@ def render(self, window, event, values): # if value not in self.camera_list: # self.config.capture_source = value # if "COM" not in value: - ports = ("COM", "/dev/ttyACM") + ports = ("COM", "/dev/ttyACM", "/dev/tty.usbmodem", "/dev/cu.usbmodem") if any(x in str(value) for x in ports): self.config.capture_source = value else: @@ -341,6 +340,8 @@ def render(self, window, event, values): and "udp" not in value and "COM" not in value and "/dev/ttyACM" not in value + and "/dev/tty.usbmodem" not in value + and "/dev/cu.usbmodem" not in value and value not in self.camera_list ): # If http is not in camera address, add it. self.config.capture_source = ( @@ -390,11 +391,15 @@ def render(self, window, event, values): if self.settings_config.use_calibration == True: window[self.gui_restart_calibration].update(disabled=False) window[self.gui_stop_calibration].update(disabled=False) - print(f'[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.enabled")}') + print( + f'[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.enabled")}' + ) else: window[self.gui_restart_calibration].update(disabled=True) window[self.gui_stop_calibration].update(disabled=True) - print(f'[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.disabled")}') + print( + f'[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.disabled")}' + ) if event == "{}+UP".format(self.gui_roi_selection): # Event for mouse button up in ROI mode @@ -435,8 +440,8 @@ def render(self, window, event, values): f'\033[94m[{lang._instance.get_string("log.info")}] {lang._instance.get_string("info.refreshedCameraList")}\033[0m' ) self.camera_list = list_camera_names() - #print(self.camera_list) - window[self.gui_camera_addr].update(values=self.camera_list,size=(20,0)) + # print(self.camera_list) + window[self.gui_camera_addr].update(values=self.camera_list, size=(20, 0)) if event == self.gui_restart_calibration: if ( @@ -484,7 +489,9 @@ def render(self, window, event, values): ) window[self.gui_tracking_fps].update(self._movavg_fps(self.camera.fps)) window[self.gui_tracking_bps].update(self._movavg_bps(self.camera.bps)) - if not self.settings_config.gui_disable_camera_preview: #If not hiding the image + if ( + not self.settings_config.gui_disable_camera_preview + ): # If not hiding the image if self.in_roi_mode: try: if self.roi_queue.empty(): @@ -518,7 +525,7 @@ def render(self, window, event, values): except Empty: pass - else: # We are hiding the previews and crop feed. + else: # We are hiding the previews and crop feed. window[self.gui_roi_layout].update(visible=False) window[self.gui_tracking_layout].update(visible=False) self.in_roi_mode = False diff --git a/BabbleApp/config.py b/BabbleApp/config.py index 0d913fc..32890ed 100644 --- a/BabbleApp/config.py +++ b/BabbleApp/config.py @@ -35,7 +35,7 @@ class BabbleSettingsConfig(BaseModel): gui_osc_delay_enable: bool = False gui_osc_delay_seconds: float = 0.01 gui_update_check: bool = True - gui_logging: bool = True #Prefer "False" for stable release? + gui_logging: bool = True # Prefer "False" for stable release? gui_ROSC: bool = False gui_osc_location: str = "" gui_multiply: float = 1 diff --git a/BabbleApp/general_settings_widget.py b/BabbleApp/general_settings_widget.py index fa08800..5508a7d 100644 --- a/BabbleApp/general_settings_widget.py +++ b/BabbleApp/general_settings_widget.py @@ -6,6 +6,7 @@ from threading import Event from utils.misc_utils import bg_color_highlight, bg_color_clear, is_valid_int_input + class SettingsWidget: def __init__( self, widget_id: Tab, main_config: BabbleSettingsConfig, osc_queue: Queue @@ -314,13 +315,14 @@ def render(self, window, event, values): self.config.gui_osc_receiver_port = int(value) changed = True else: - print(f'\033[91m[{lang._instance.get_string("log.error")}] {lang._instance.get_string("error.oscPort")}\033[0m') + print( + f'\033[91m[{lang._instance.get_string("log.error")}] {lang._instance.get_string("error.oscPort")}\033[0m' + ) if not is_valid_int_input(value): value = value[:-1] window[self.gui_osc_receiver_port].update(value) values[self.gui_osc_receiver_port] = value - # Update OSC location if it has changed if self.config.gui_osc_location != values[self.gui_osc_location]: self.config.gui_osc_location = values[self.gui_osc_location] @@ -332,15 +334,20 @@ def render(self, window, event, values): changed = True # Update recalibrate address if it has changed - if self.config.gui_osc_recalibrate_address != values[self.gui_osc_recalibrate_address]: - self.config.gui_osc_recalibrate_address = values[self.gui_osc_recalibrate_address] + if ( + self.config.gui_osc_recalibrate_address + != values[self.gui_osc_recalibrate_address] + ): + self.config.gui_osc_recalibrate_address = values[ + self.gui_osc_recalibrate_address + ] changed = True # Update check option if self.config.gui_update_check != values[self.gui_update_check]: self.config.gui_update_check = values[self.gui_update_check] changed = True - + # Logging option if self.config.gui_logging != values[self.gui_logging]: self.config.gui_logging = values[self.gui_logging] @@ -348,8 +355,13 @@ def render(self, window, event, values): # Update disable camera preview option value = values[self.gui_disable_camera_preview] - if self.config.gui_disable_camera_preview != values[self.gui_disable_camera_preview]: - self.config.gui_disable_camera_preview = bool(values[self.gui_disable_camera_preview]) + if ( + self.config.gui_disable_camera_preview + != values[self.gui_disable_camera_preview] + ): + self.config.gui_disable_camera_preview = bool( + values[self.gui_disable_camera_preview] + ) changed = True # Update frame delay enable option @@ -361,7 +373,9 @@ def render(self, window, event, values): # Update frame delay option value = values[self.gui_osc_delay_seconds] if self.config.gui_osc_delay_seconds != values[self.gui_osc_delay_seconds]: - self.config.gui_osc_delay_seconds = float(values[self.gui_osc_delay_seconds]) + self.config.gui_osc_delay_seconds = float( + values[self.gui_osc_delay_seconds] + ) changed = True # Update ROSC option diff --git a/BabbleApp/logger.py b/BabbleApp/logger.py index f1fbebe..2de97f1 100644 --- a/BabbleApp/logger.py +++ b/BabbleApp/logger.py @@ -6,10 +6,12 @@ import psutil from logging.handlers import RotatingFileHandler + def strip_ansi_codes(text): """Remove ANSI color codes from a string.""" - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - return ansi_escape.sub('', text) + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + return ansi_escape.sub("", text) + def log_system_info(logger): """ @@ -22,11 +24,11 @@ def log_system_info(logger): os_release = platform.release() machine = platform.machine() processor = platform.processor() - + # CPU and Memory cpu_count = psutil.cpu_count(logical=True) - total_memory = psutil.virtual_memory().total // (1024 ** 2) # Convert bytes to MB - + total_memory = psutil.virtual_memory().total // (1024**2) # Convert bytes to MB + logger.info("========== System Information ==========") logger.info(f"Operating System: {os_name} {os_release} (Version: {os_version})") logger.info(f"Architecture: {machine}") @@ -38,15 +40,19 @@ def log_system_info(logger): except Exception as e: logger.error(f"Failed to log system information: {e}") + class _RotatingFileHandler(RotatingFileHandler): def doRollover(self): super().doRollover() # Include system info after rollover log_system_info(logging.getLogger("debug_logger")) + def setup_logging(): - # Log to program directory - log_dir = "./Logs" + # Determine the user's Documents directory + # documents_dir = os.path.join(os.path.expanduser("~"), "Documents") + documents_dir = "./Logs" + log_dir = os.path.join(documents_dir, "ProjectBabble") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "ProjectBabble.log") @@ -54,9 +60,14 @@ def setup_logging(): logger = logging.getLogger("debug_logger") logger.setLevel(logging.DEBUG) - file_handler = _RotatingFileHandler(log_file, mode='w', maxBytes=2000000, backupCount=1, encoding='utf-8') + file_handler = _RotatingFileHandler( + log_file, mode="w", maxBytes=2000000, backupCount=1, encoding="utf-8" + ) + file_handler.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + formatter = logging.Formatter( + "%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) @@ -88,4 +99,4 @@ def flush(self): sys.stdout = StreamToLogger(sys.stdout, logging.INFO) sys.stderr = StreamToLogger(sys.stderr, logging.ERROR) - log_system_info(logger) \ No newline at end of file + log_system_info(logger) diff --git a/BabbleApp/mjpeg_streamer.py b/BabbleApp/mjpeg_streamer.py index 6cc6352..ec0a9a6 100644 --- a/BabbleApp/mjpeg_streamer.py +++ b/BabbleApp/mjpeg_streamer.py @@ -4,6 +4,7 @@ import threading import time + class MJPEGVideoCapture: def __init__(self, url): self.url = url @@ -31,17 +32,19 @@ def _update(self): self.byte_buffer += chunk # Process all available complete frames in the buffer while True: - start = self.byte_buffer.find(b'\xff\xd8') # JPEG start marker - end = self.byte_buffer.find(b'\xff\xd9') # JPEG end marker + start = self.byte_buffer.find(b"\xff\xd8") # JPEG start marker + end = self.byte_buffer.find(b"\xff\xd9") # JPEG end marker if start != -1 and end != -1: - jpg = self.byte_buffer[start:end+2] - self.byte_buffer = self.byte_buffer[end+2:] - + jpg = self.byte_buffer[start : end + 2] + self.byte_buffer = self.byte_buffer[end + 2 :] + image = np.frombuffer(jpg, dtype=np.uint8) if image.size != 0: frame = cv2.imdecode(image, cv2.IMREAD_COLOR) if frame is not None: - self.frame = frame # Always update to the latest frame + self.frame = ( + frame # Always update to the latest frame + ) self.frame_ready = True else: break @@ -51,29 +54,30 @@ def _update(self): continue def read(self): - # Return whether a frame exists and its copy + # Return whether a frame exists and its copy start = time.time() while True: if self.frame is not None and self.frame_ready: - #time.sleep(self.sleep_time) + # time.sleep(self.sleep_time) self.frame_old = self.frame self.frame_ready = False return True, self.frame.copy() else: end = time.time() - time.sleep(1/120) - if end-start>1: + time.sleep(1 / 120) + if end - start > 1: return False, None - #return False, None + # return False, None def isOpened(self): - return self.running - + return self.running + def isPrimed(self): if self.frame is not None: return True - else: return False + else: + return False def release(self): self.running = False @@ -92,14 +96,14 @@ def get(self, item): if __name__ == "__main__": cap = MJPEGVideoCapture("http://openiristracker.local") cap.open() - + while cap.isOpened(): ret, frame = cap.read() if ret and frame is not None: cv2.imshow("MJPEG Stream", frame) - + if cv2.waitKey(1) & 0xFF == ord("q"): break - + cap.release() cv2.destroyAllWindows() diff --git a/BabbleApp/one_euro_filter.py b/BabbleApp/one_euro_filter.py index 2c2421d..2503458 100644 --- a/BabbleApp/one_euro_filter.py +++ b/BabbleApp/one_euro_filter.py @@ -31,7 +31,9 @@ def __call__(self, x): t = time() t_e = t - self.t_prev - if t_e != 0.0: # occasionally when switching to algos this becomes zero causing divide by zero errors crashing the filter. + if ( + t_e != 0.0 + ): # occasionally when switching to algos this becomes zero causing divide by zero errors crashing the filter. t_e = np.full(x.shape, t_e) # The filtered derivative of the signal. diff --git a/BabbleApp/osc.py b/BabbleApp/osc.py index 5a08dda..8c5ad98 100644 --- a/BabbleApp/osc.py +++ b/BabbleApp/osc.py @@ -10,18 +10,22 @@ import os from lang_manager import LocaleStringManager as lang + class Tab(IntEnum): CAM = 0 SETTINGS = 1 ALGOSETTINGS = 2 CALIBRATION = 3 + import numpy as np + def delay_output_osc(array, delay_seconds, self): time.sleep(delay_seconds) output_osc(array, self) + def output_osc(array, self): location = self.config.gui_osc_location multi = self.config.gui_multiply @@ -111,7 +115,9 @@ def run(self): delay_enable = self.config.gui_osc_delay_enable delay_seconds = self.config.gui_osc_delay_seconds if delay_enable: - threading.Thread(target=delay_output_osc, args=(cam_info.output, delay_seconds, self)).start() + threading.Thread( + target=delay_output_osc, args=(cam_info.output, delay_seconds, self) + ).start() else: output_osc(cam_info.output, self) diff --git a/BabbleApp/osc_calibrate_filter.py b/BabbleApp/osc_calibrate_filter.py index db3f80c..76ffdbb 100644 --- a/BabbleApp/osc_calibrate_filter.py +++ b/BabbleApp/osc_calibrate_filter.py @@ -5,10 +5,12 @@ import os from lang_manager import LocaleStringManager as lang + class CamId(IntEnum): CAM = 0 SETTINGS = 1 + class cal: def __init__(self): self.calibration_frame_counter = None diff --git a/BabbleApp/utils/misc_utils.py b/BabbleApp/utils/misc_utils.py index a9d9189..11425c7 100644 --- a/BabbleApp/utils/misc_utils.py +++ b/BabbleApp/utils/misc_utils.py @@ -24,18 +24,22 @@ # Detect the operating system os_type = platform.system() -if os_type == 'Windows': +if os_type == "Windows": from pygrabber.dshow_graph import FilterGraph + graph = FilterGraph() + def is_valid_float_input(value): # Allow empty string, negative sign, or a float number return bool(re.match(r"^-?\d*\.?\d*$", value)) + def is_valid_int_input(value): # Allow empty string, negative sign, or an integer number return bool(re.match(r"^-?\d*$", value)) + def list_camera_names(): cam_list = graph.get_input_devices() cam_names = [] @@ -44,10 +48,11 @@ def list_camera_names(): cam_names = cam_names + list_serial_ports() return cam_names + @contextlib.contextmanager def suppress_stderr(): """Context manager to suppress stderr (used for OpenCV warnings).""" - with open(os.devnull, 'w') as devnull: + with open(os.devnull, "w") as devnull: old_stderr_fd = os.dup(2) os.dup2(devnull.fileno(), 2) try: @@ -56,6 +61,7 @@ def suppress_stderr(): os.dup2(old_stderr_fd, 2) os.close(old_stderr_fd) + def list_cameras_opencv(): """Use OpenCV to check available cameras by index (fallback for Linux/macOS)""" index = 0 @@ -72,6 +78,7 @@ def list_cameras_opencv(): index += 1 return arr + def is_uvc_device(device): """Check if the device is a UVC video device (not metadata)""" try: @@ -95,14 +102,12 @@ def list_linux_uvc_devices(): try: # v4l2-ctl --list-devices breaks if video devices are non-sequential. # So this might be better? - result = glob.glob("/dev/video*"); + result = glob.glob("/dev/video*") devices = [] current_device = None for line in result: if is_uvc_device(line): - devices.append( - line - ) # We return the path like '/dev/video0' + devices.append(line) # We return the path like '/dev/video0' return devices @@ -113,7 +118,7 @@ def list_linux_uvc_devices(): def list_camera_names(): """Cross-platform function to list camera names""" - if os_type == 'Windows': + if os_type == "Windows": # On Windows, use pygrabber to list devices cam_list = graph.get_input_devices() return cam_list + list_serial_ports() @@ -131,13 +136,13 @@ def list_camera_names(): def list_serial_ports(): - #print("DEBUG: Listed Serial Ports") - """ Lists serial port names + # print("DEBUG: Listed Serial Ports") + """Lists serial port names - :raises EnvironmentError: - On unsupported or unknown platforms - :returns: - A list of the serial ports available on the system + :raises EnvironmentError: + On unsupported or unknown platforms + :returns: + A list of the serial ports available on the system """ if not sys.platform.startswith(("win", "linux", "cygwin", "darwin")): raise EnvironmentError("Unsupported platform") @@ -158,13 +163,13 @@ def get_camera_index_by_name(name): # On Linux, we use device paths like '/dev/video0' and match directly # OpenCV expects the actual /dev/video#, not the offset into the device list if os_type == "Linux": - if (name.startswith("/dev/ttyACM")): - return int(str.replace(name,"/dev/ttyACM","")); + if name.startswith("/dev/ttyACM"): + return int(str.replace(name, "/dev/ttyACM", "")) else: - return int(str.replace(name,"/dev/video","")); + return int(str.replace(name, "/dev/video", "")) # On Windows, match by camera name - elif os_type == 'Windows': + elif os_type == "Windows": for i, device_name in enumerate(cam_list): if device_name == name: return i @@ -177,13 +182,17 @@ def get_camera_index_by_name(name): return None + # Set environment variable before importing sounddevice. Value is not important. os.environ["SD_ENABLE_ASIO"] = "1" + + def playSound(file): data, fs = sf.read(file) sd.play(data, fs) sd.wait() + # Handle debugging virtual envs. def ensurePath(): if os.path.exists(os.path.join(os.getcwd(), "BabbleApp")): diff --git a/BabbleApp/utils/parse_translations.py b/BabbleApp/utils/parse_translations.py index b577183..e348f3c 100644 --- a/BabbleApp/utils/parse_translations.py +++ b/BabbleApp/utils/parse_translations.py @@ -2,25 +2,30 @@ def parse_translations(output_folder, input_translations_csv): import csv import json from os import path, makedirs + translations_data = [] with open(input_translations_csv, "r", encoding="utf-8") as translations_file: csv_reader = csv.reader(translations_file) for row in csv_reader: translations_data.append(row) - for column_index in range(3, len(translations_data[0])): # Skip first two columns (File,context, context) + for column_index in range( + 3, len(translations_data[0]) + ): # Skip first two columns (File,context, context) language = translations_data[0][column_index] result = {} - for row_index in range(1, len(translations_data)): #skip headers - context = translations_data[row_index][1].replace("\"","") + for row_index in range(1, len(translations_data)): # skip headers + context = translations_data[row_index][1].replace('"', "") translation = translations_data[row_index][column_index] result[context] = translation # Makes folder if it doesn't exist. if not path.exists(path.join(output_folder, language)): makedirs(path.join(output_folder, language)) - with open(path.join(output_folder,language,"locale.json"), "w", encoding="utf-8") as json_file: + with open( + path.join(output_folder, language, "locale.json"), "w", encoding="utf-8" + ) as json_file: json.dump(result, json_file, indent="\t", ensure_ascii=False) - - + + if __name__ == "__main__": output_folder = r"parsed_translations" input_translations_csv = r"C:\Users\T\Desktop\PythonProjects\BabbleApp\ProjectBabble\BabbleApp\utils\all_translations.csv" diff --git a/BabbleApp/vivefacialtracker/camera.py b/BabbleApp/vivefacialtracker/camera.py index 9c79b49..9a86763 100644 --- a/BabbleApp/vivefacialtracker/camera.py +++ b/BabbleApp/vivefacialtracker/camera.py @@ -30,27 +30,32 @@ import numpy as np from utils.misc_utils import os_type -if os_type == 'Linux': +if os_type == "Linux": import v4l2py as v4l import v4l2py.device as v4ld -elif os_type == 'Windows': +elif os_type == "Windows": import pygrabber.dshow_graph as pgdsg import pygrabber.dshow_ids as pgdsi class FTCamera: """Opens a camera grabbing frames as numpy arrays.""" + class ControlType(Enum): """Type of the control.""" - INTEGER = 'int' - BOOLEAN = 'bool' - SELECT = 'select' - if os_type == 'Linux': + INTEGER = "int" + BOOLEAN = "bool" + SELECT = "select" + + if os_type == "Linux": + class Control: """Control defined by the hardware.""" - def __init__(self: "FTCamera.ControlInfo", - control: v4ld.BaseControl) -> None: + + def __init__( + self: "FTCamera.ControlInfo", control: v4ld.BaseControl + ) -> None: self._control = control self.name = control.name self.type = None @@ -59,7 +64,7 @@ def __init__(self: "FTCamera.ControlInfo", self.step: int = 1 self.default: int = 0 self.clipping: bool = False - self.choices: dict[int: str] = {} + self.choices: dict[int:str] = {} match control.type: case v4ld.ControlType.INTEGER: self.type = FTCamera.ControlType.INTEGER @@ -89,9 +94,12 @@ def value(self: "FTCamera.ControlInfo", new_value: int | bool): def is_writeable(self: "FTCamera.ControlInfo") -> bool: """Control is writeable.""" return self._control.is_writeable - elif os_type == 'Windows': + + elif os_type == "Windows": + class Control: """Control defined by the hardware.""" + def __init__(self: "FTCamera.ControlInfo") -> None: raise RuntimeError("Not supported") @@ -111,31 +119,43 @@ def is_writeable(self: "FTCamera.ControlInfo") -> bool: class FrameSize: """Frame size.""" - def __init__(self: 'FTCamera.FrameSize', index: int, - width: int, height: int, min_fps: int) -> None: + + def __init__( + self: "FTCamera.FrameSize", + index: int, + width: int, + height: int, + min_fps: int, + ) -> None: self.index = index self.width = width self.height = height self.min_fps = min_fps - def __repr__(self: 'FTCamera.FrameSize') -> str: + def __repr__(self: "FTCamera.FrameSize") -> str: return "(width={}, height={}, fps={})".format( - self.width, self.height, self.min_fps) + self.width, self.height, self.min_fps + ) class FrameFormat: """Frame format.""" - def __init__(self: 'FTCamera.FrameFormat', pixel_format: str, - description: str) -> None: + + def __init__( + self: "FTCamera.FrameFormat", pixel_format: str, description: str + ) -> None: self.pixel_format = pixel_format self.description = description - def __repr__(self: 'FTCamera.FrameFormat') -> str: + def __repr__(self: "FTCamera.FrameFormat") -> str: return "(pixel_format={}, description='{}')".format( - self.pixel_format, self.description) + self.pixel_format, self.description + ) class Terminator: """Terminator.""" + terminate_requested = False + def __init__(self): signal.signal(signal.SIGINT, self.request_terminate) signal.signal(signal.SIGTERM, self.request_terminate) @@ -146,6 +166,7 @@ def request_terminate(self, signum, frame): class Processor: """Processor.""" + def __init__(self): pass @@ -154,7 +175,7 @@ def process(self, frame) -> None: _logger = logging.getLogger("evcta.FTCamera") - def __init__(self: 'FTCamera', index: int) -> None: + def __init__(self: "FTCamera", index: int) -> None: """Create camera grabber. The camera is not yet opened. Set "callback_frame" then call @@ -165,9 +186,9 @@ def __init__(self: 'FTCamera', index: int) -> None: file "/dev/video{index}". """ self._index: int = index - if os_type == 'Linux': + if os_type == "Linux": self._device: v4l.Device = None - elif os_type == 'Windows': + elif os_type == "Windows": self._device: pgdsg.VideoInput = None self._filter_graph: pgdsg.FilterGraph = None self._filter_video = None @@ -192,7 +213,7 @@ def __init__(self: 'FTCamera', index: int) -> None: self.terminator: FTCamera.Terminator = None self.processor: FTCamera.Processor = None - def open(self: 'FTCamera') -> None: + def open(self: "FTCamera") -> None: """Open device if closed. This opens the device using Video4Linux. Finds frame size and @@ -210,21 +231,23 @@ def open(self: 'FTCamera') -> None: if self._device: return FTCamera._logger.info("FTCamera.open: index {}".format(self._index)) - if os_type == 'Linux': + if os_type == "Linux": self._device = v4l.Device.from_id(self._index) self._device.open() - elif os_type == 'Windows': + elif os_type == "Windows": self._filter_graph = pgdsg.FilterGraph() self._filter_graph.add_video_input_device(self._index) self._filter_video = self._filter_graph.get_input_device() - FTCamera._logger.info("Video input filter: {}".format( - self._filter_video.Name)) + FTCamera._logger.info( + "Video input filter: {}".format(self._filter_video.Name) + ) self._device = self._filter_video self._filter_graph.add_sample_grabber(self._async_grabber) self._filter_grabber = self._filter_graph.filters[ - pgdsg.FilterType.sample_grabber] + pgdsg.FilterType.sample_grabber + ] self._filter_graph.add_null_render() self._find_format() @@ -233,11 +256,11 @@ def open(self: 'FTCamera') -> None: self._init_arrays() self._find_controls() - if not os_type == 'Linux': + if not os_type == "Linux": self._filter_graph.prepare_preview_graph() # self._filter_graph.print_debug_info() - def _find_format(self: 'FTCamera') -> None: + def _find_format(self: "FTCamera") -> None: """Logs all formats supported by camera and picks best one. Picks the first format which is YUV and supports capturing. @@ -245,41 +268,56 @@ def _find_format(self: 'FTCamera') -> None: Throws "Exception" if no suitable format is found. """ FTCamera._logger.info("formats:") - if os_type == 'Linux': + if os_type == "Linux": for x in self._device.info.formats: FTCamera._logger.info("- {}".format(x)) - self._format = next(x for x in self._device.info.formats - if x.pixel_format == v4l.PixelFormat.YUYV) - elif os_type == 'Windows': + self._format = next( + x + for x in self._device.info.formats + if x.pixel_format == v4l.PixelFormat.YUYV + ) + elif os_type == "Windows": for x in self._filter_video.get_formats(): FTCamera._logger.info(x) - fmt = next(x for x in self._filter_video.get_formats() - if x['media_type_str'] in ['YUY2']) + fmt = next( + x + for x in self._filter_video.get_formats() + if x["media_type_str"] in ["YUY2"] + ) self._format = FTCamera.FrameFormat( - fmt['media_type_str'], fmt['media_type_str']) + fmt["media_type_str"], fmt["media_type_str"] + ) FTCamera._logger.info("using format: {}".format(self._format)) - def _find_frame_size(self: 'FTCamera') -> None: + def _find_frame_size(self: "FTCamera") -> None: """Logs all sizes supported by camera and picks best one. Picks the first size with YUV format and a minimum FPS of 30. Throws "Exception" if no suitable size is found. """ - if os_type == 'Linux': + if os_type == "Linux": FTCamera._logger.info("sizes:") for x in self._device.info.frame_sizes: FTCamera._logger.info("- {}".format(x)) - self._frame_size = next(x for x in self._device.info.frame_sizes - if x.pixel_format == v4l.PixelFormat.YUYV - and x.min_fps >= 30) - elif os_type == 'Windows': - fsize = next(x for x in self._filter_video.get_formats() - if x['media_type_str'] == self._format.pixel_format - and x['min_framerate'] >= 30) + self._frame_size = next( + x + for x in self._device.info.frame_sizes + if x.pixel_format == v4l.PixelFormat.YUYV and x.min_fps >= 30 + ) + elif os_type == "Windows": + fsize = next( + x + for x in self._filter_video.get_formats() + if x["media_type_str"] == self._format.pixel_format + and x["min_framerate"] >= 30 + ) self._frame_size = FTCamera.FrameSize( - fsize['index'], fsize['width'], fsize['height'], - int(fsize['max_framerate'])) + fsize["index"], + fsize["width"], + fsize["height"], + int(fsize["max_framerate"]), + ) FTCamera._logger.info("using frame size : {}".format(self._frame_size)) self._frame_width = self._frame_size.width @@ -289,21 +327,21 @@ def _find_frame_size(self: 'FTCamera') -> None: self._half_frame_width = self._frame_width // 2 self._half_frame_height = self._frame_height // 2 - def _set_frame_format(self: 'FTCamera') -> None: + def _set_frame_format(self: "FTCamera") -> None: """Activates the found format and size.""" - if os_type == 'Linux': + if os_type == "Linux": self._device.set_format( buffer_type=v4ld.BufferType.VIDEO_CAPTURE, width=self._frame_size.width, height=self._frame_size.height, - pixel_format=self._format.pixel_format) - elif os_type == 'Windows': + pixel_format=self._format.pixel_format, + ) + elif os_type == "Windows": self._filter_video.set_format(self._frame_size.index) # make grabber accept YUV2 not RGB24 - guid_yuv2 = '{32595559-0000-0010-8000-00AA00389B71}' - self._filter_grabber.set_media_type( - pgdsg.MediaTypes.Video, guid_yuv2) + guid_yuv2 = "{32595559-0000-0010-8000-00AA00389B71}" + self._filter_grabber.set_media_type(pgdsg.MediaTypes.Video, guid_yuv2) # by changing the format we have to replace the callback # handler too. this is required since the original callback @@ -311,13 +349,21 @@ def _set_frame_format(self: 'FTCamera') -> None: # 2 channels. this crashes python_grabber class SampleGrabberYUV2(pgdsg.SampleGrabberCallback): """Sample grabber using YUV2.""" - def __init__(self: 'SampleGrabberYUV2', - callback: pgdsg.Callable[[pgdsg.Mat], None]): + + def __init__( + self: "SampleGrabberYUV2", + callback: pgdsg.Callable[[pgdsg.Mat], None], + ): super().__init__(callback) self.keep_photo: bool = False - def BufferCB(self: 'SampleGrabberYUV2', this, SampleTime, - pBuffer: pgdsg.NPBUFFER, BufferLen: int) -> int: + def BufferCB( + self: "SampleGrabberYUV2", + this, + SampleTime, + pBuffer: pgdsg.NPBUFFER, + BufferLen: int, + ) -> int: """Buffer callback.""" if self.keep_photo: self.keep_photo = False @@ -331,23 +377,22 @@ def BufferCB(self: 'SampleGrabberYUV2', this, SampleTime, self.callback(img) return 0 - self._filter_grabber.set_callback( - SampleGrabberYUV2(self._async_grabber), 1) + self._filter_grabber.set_callback(SampleGrabberYUV2(self._async_grabber), 1) # an alternative is pgdsi.GUID_NULL accepting everything - def _init_arrays(self: 'FTCamera') -> None: + def _init_arrays(self: "FTCamera") -> None: """Create numpy arrays to fill during capturing.""" self._arr_data = np.zeros([self._pixel_count * 2], dtype=np.uint8) self._arr_merge = np.zeros([self._pixel_count, 3], dtype=np.uint8) self._arr_c2 = np.empty([self._half_pixel_count], np.uint8) self._arr_c3 = np.empty([self._half_pixel_count], np.uint8) - def _find_controls(self: 'FTCamera') -> None: + def _find_controls(self: "FTCamera") -> None: """Logs all controls and stores them for use.""" self._controls = [] FTCamera._logger.info("controls:") - if os_type == 'Linux': + if os_type == "Linux": for x in self._device.controls.values(): FTCamera._logger.info("- {}".format(x)) control = FTCamera.Control(x) @@ -356,64 +401,67 @@ def _find_controls(self: 'FTCamera') -> None: self._controls.append(control) @property - def device_index(self: 'FTCamera') -> int: + def device_index(self: "FTCamera") -> int: """Device index.""" return self._index - if os_type == 'Linux': + if os_type == "Linux": + @property - def device(self: 'FTCamera') -> v4l.Device: + def device(self: "FTCamera") -> v4l.Device: """Video4Linux device if open or None if closed.""" return self._device - elif os_type == 'Windows': + + elif os_type == "Windows": + @property - def device(self: 'FTCamera') -> pgdsg.VideoInput: + def device(self: "FTCamera") -> pgdsg.VideoInput: """Device if open or None if closed.""" return self._device @property - def frame_width(self: 'FTCamera') -> int: + def frame_width(self: "FTCamera") -> int: """Width in pixels of captured frames. Only valid if device is open.""" return self._frame_width @property - def frame_height(self: 'FTCamera') -> int: + def frame_height(self: "FTCamera") -> int: """Height in pixels of captured frames. Only valid if device is open.""" return self._frame_height @property - def frame_fps(self: 'FTCamera') -> float: + def frame_fps(self: "FTCamera") -> float: """Capture frame rate. Only valid if device is open.""" return float(self._frame_size.min_fps) @property - def frame_format(self: 'FTCamera') -> str: + def frame_format(self: "FTCamera") -> str: """Capture pixel format. Only valid if device is open.""" return self._frame_size.pixel_format.name @property - def frame_format_description(self: 'FTCamera') -> str: + def frame_format_description(self: "FTCamera") -> str: """Capture pixel format description. Only valid if device is open.""" return self._format.description @property - def controls(self: 'FTCamera') -> "list[FTCamera.Control]": + def controls(self: "FTCamera") -> "list[FTCamera.Control]": """List of all supported controls. Only valid if device is open.""" return self._controls - def close(self: 'FTCamera') -> None: + def close(self: "FTCamera") -> None: """Closes the device if open. If capturing stops capturing first. @@ -422,9 +470,9 @@ def close(self: 'FTCamera') -> None: return FTCamera._logger.info("FTCamera.close: index {}".format(self._index)) try: - if os_type == 'Linux': + if os_type == "Linux": self._device.close() - elif os_type == 'Windows': + elif os_type == "Windows": self._filter_graph.stop() self._filter_graph.remove_filters() self._filter_grabber = None @@ -434,8 +482,9 @@ def close(self: 'FTCamera') -> None: pass self._device = None - if os_type == 'Linux': - def read(self: 'FTCamera') -> None: + if os_type == "Linux": + + def read(self: "FTCamera") -> None: """Read next frame.""" FTCamera._logger.info("FTCamera.read: ENTER") for frame in self._device: @@ -443,19 +492,22 @@ def read(self: 'FTCamera') -> None: break self._process_frame(frame) FTCamera._logger.info("FTCamera.read: EXIT") - elif os_type == 'Windows': - def read(self: 'FTCamera') -> None: + + elif os_type == "Windows": + + def read(self: "FTCamera") -> None: """Read frames until requested to exit.""" self._filter_graph.run() while not self.terminator.terminate_requested: self._filter_graph.grab_frame() time.sleep(0.001) - def _async_grabber(self: 'FTCamera', image: np.ndarray) -> None: + def _async_grabber(self: "FTCamera", image: np.ndarray) -> None: self._process_frame(image) - if os_type == 'Linux': - def _process_frame(self: 'FTCamera', frame) -> None: + if os_type == "Linux": + + def _process_frame(self: "FTCamera", frame) -> None: """Process captured frames. Operates only on YUV422 format right now. Calls _decode_yuv422 @@ -474,31 +526,42 @@ def _process_frame(self: 'FTCamera', frame) -> None: self._decode_yuv422(frame.data) case _: FTCamera._logger.error( - "Unsupported pixel format: {}".format(frame.pixel_format)) + "Unsupported pixel format: {}".format(frame.pixel_format) + ) return False - self.processor.process(self._arr_merge.reshape([frame.height, frame.width, 3])) + self.processor.process( + self._arr_merge.reshape([frame.height, frame.width, 3]) + ) except Exception: FTCamera._logger.exception("FTCamera._process_frame") - elif os_type == 'Windows': - def _process_frame(self: 'FTCamera', frame: np.ndarray) -> None: + + elif os_type == "Windows": + + def _process_frame(self: "FTCamera", frame: np.ndarray) -> None: if len(frame) == 0: return try: match self._format.pixel_format: - case 'YUY2': + case "YUY2": self._decode_yuv422(frame) case _: FTCamera._logger.error( "Unsupported pixel format: {}".format( - self._format.pixel_format)) + self._format.pixel_format + ) + ) return False - self.processor.process(self._arr_merge.reshape( - [self._frame_size.height, self._frame_size.width, 3])) + self.processor.process( + self._arr_merge.reshape( + [self._frame_size.height, self._frame_size.width, 3] + ) + ) except Exception: FTCamera._logger.exception("FTCamera._process_frame") - if os_type == 'Linux': - def _decode_yuv422(self: 'FTCamera', frame: list[bytes]) -> None: + if os_type == "Linux": + + def _decode_yuv422(self: "FTCamera", frame: list[bytes]) -> None: """Decode YUV422 frame into YUV444 frame.""" self._arr_data[:] = np.frombuffer(frame, dtype=np.uint8) @@ -506,23 +569,25 @@ def _decode_yuv422(self: 'FTCamera', frame: list[bytes]) -> None: self._arr_c2[:] = np.array(self._arr_data[1::4]) self._arr_c3[:] = np.array(self._arr_data[3::4]) - self._arr_merge[0:self._pixel_count:2, 1] = self._arr_c2 - self._arr_merge[1:self._pixel_count:2, 1] = self._arr_c2 - self._arr_merge[0:self._pixel_count:2, 2] = self._arr_c3 - self._arr_merge[1:self._pixel_count:2, 2] = self._arr_c3 - elif os_type == 'Windows': - def _decode_yuv422(self: 'FTCamera', frame: np.ndarray) -> None: - self._arr_merge[:, 0] = frame[:, :, 0].ravel(order='F') + self._arr_merge[0 : self._pixel_count : 2, 1] = self._arr_c2 + self._arr_merge[1 : self._pixel_count : 2, 1] = self._arr_c2 + self._arr_merge[0 : self._pixel_count : 2, 2] = self._arr_c3 + self._arr_merge[1 : self._pixel_count : 2, 2] = self._arr_c3 + + elif os_type == "Windows": + + def _decode_yuv422(self: "FTCamera", frame: np.ndarray) -> None: + self._arr_merge[:, 0] = frame[:, :, 0].ravel(order="F") - self._arr_c2[:] = np.array(frame[:, :, 1:].ravel(order='F')[0::2]) - self._arr_c3[:] = np.array(frame[:, :, 1:].ravel(order='F')[1::2]) + self._arr_c2[:] = np.array(frame[:, :, 1:].ravel(order="F")[0::2]) + self._arr_c3[:] = np.array(frame[:, :, 1:].ravel(order="F")[1::2]) - self._arr_merge[0:self._pixel_count:2, 1] = self._arr_c2 - self._arr_merge[1:self._pixel_count:2, 1] = self._arr_c2 - self._arr_merge[0:self._pixel_count:2, 2] = self._arr_c3 - self._arr_merge[1:self._pixel_count:2, 2] = self._arr_c3 + self._arr_merge[0 : self._pixel_count : 2, 1] = self._arr_c2 + self._arr_merge[1 : self._pixel_count : 2, 1] = self._arr_c2 + self._arr_merge[0 : self._pixel_count : 2, 2] = self._arr_c3 + self._arr_merge[1 : self._pixel_count : 2, 2] = self._arr_c3 - def _decode_yuv422_y_only(self: 'FTCamera', frame: list[bytes]) -> None: + def _decode_yuv422_y_only(self: "FTCamera", frame: list[bytes]) -> None: """Fast version of _decode_yuv422. This version is faster since it only copies the Y channel diff --git a/BabbleApp/vivefacialtracker/camera_controller.py b/BabbleApp/vivefacialtracker/camera_controller.py index 2f6aef0..4de6cae 100644 --- a/BabbleApp/vivefacialtracker/camera_controller.py +++ b/BabbleApp/vivefacialtracker/camera_controller.py @@ -40,7 +40,7 @@ class FTCameraController: _logger = logging.getLogger("evcta.FTCameraController") - def __init__(self: 'FTCameraController', index: int) -> None: + def __init__(self: "FTCameraController", index: int) -> None: """Create camera grabber. The camera is not yet opened. Set "callback_frame" then call @@ -55,16 +55,18 @@ def __init__(self: 'FTCameraController', index: int) -> None: self._proc_read: multiprocessing.Process = None self._proc_queue: multiprocessing.queues.Queue = None - def close(self: 'FTCameraController') -> None: + def close(self: "FTCameraController") -> None: """Closes the device if open. If capturing stops capturing first. """ self.is_open = False - FTCameraController._logger.info("FTCameraController.close: index {}".format(self._index)) + FTCameraController._logger.info( + "FTCameraController.close: index {}".format(self._index) + ) self._stop_read() - def open(self: 'FTCameraController') -> None: + def open(self: "FTCameraController") -> None: """Start capturing frames if not capturing and device is open.""" if self._proc_read is not None: return @@ -72,20 +74,22 @@ def open(self: 'FTCameraController') -> None: self.is_open = True FTCameraController._logger.info("FTCameraController.open: start process") self._proc_queue = multiprocessing.Queue(maxsize=1) - self._proc_read = multiprocessing.Process(target=self._read_process, args=(self._proc_queue,)) + self._proc_read = multiprocessing.Process( + target=self._read_process, args=(self._proc_queue,) + ) self._proc_read.start() - def _reopen(self: 'FTCameraController') -> None: + def _reopen(self: "FTCameraController") -> None: FTCameraController._logger.info("FTCameraController._reopen") self.close() self.open() - def get_image(self: 'FTCameraController') -> np.ndarray: + def get_image(self: "FTCameraController") -> np.ndarray: """Get next image or None.""" try: # timeout of 1s is a bit short. 2s is safer frame = self._proc_queue.get(True, 2) - shape = unpack('HHH', frame[0:6]) + shape = unpack("HHH", frame[0:6]) image = np.frombuffer(frame[6:], dtype=np.uint8).reshape(shape) return image except pqueue.Empty: @@ -94,11 +98,12 @@ def get_image(self: 'FTCameraController') -> np.ndarray: return None except Exception: FTCameraController._logger.exception( - "FTCameraController.get_image: Failed getting image") + "FTCameraController.get_image: Failed getting image" + ) print(traceback.format_exc()) return None - def _stop_read(self: 'FTCameraController') -> None: + def _stop_read(self: "FTCameraController") -> None: """Stop capturing frames if capturing.""" if self._proc_read is None: return @@ -108,18 +113,22 @@ def _stop_read(self: 'FTCameraController') -> None: if self._proc_read.exitcode is not None: FTCameraController._logger.info( - "FTCameraController.stop_read: process terminated") + "FTCameraController.stop_read: process terminated" + ) else: FTCameraController._logger.info( - "FTCameraController._stop_read: process not responding, killing it") + "FTCameraController._stop_read: process not responding, killing it" + ) self._proc_read.kill() # sends a SIGKILL self._proc_read.join(1) FTCameraController._logger.info( - "FTCameraController._stop_read: process killed") + "FTCameraController._stop_read: process killed" + ) self._proc_read = None - def _read_process(self: 'FTCameraController', - queue: multiprocessing.connection.Connection) -> None: + def _read_process( + self: "FTCameraController", queue: multiprocessing.connection.Connection + ) -> None: """Read process function.""" """ @@ -128,16 +137,23 @@ def _read_process(self: 'FTCameraController', """ FTCameraController._logger.info("FTCameraController._read_process: ENTER") + class Helper(FTCamera.Processor): """Helper.""" - def __init__(self: 'FTCameraController.Helper', - queue: multiprocessing.connection.Connection) -> None: + + def __init__( + self: "FTCameraController.Helper", + queue: multiprocessing.connection.Connection, + ) -> None: self.camera: FTCamera = None self.tracker: ViveTracker = None self._queue = queue - def open_camera(self: 'FTCameraController.Helper', index: int, - queue: multiprocessing.connection.Connection) -> None: + def open_camera( + self: "FTCameraController.Helper", + index: int, + queue: multiprocessing.connection.Connection, + ) -> None: """Open camera.""" self.camera = FTCamera(index) self.camera.terminator = FTCamera.Terminator() @@ -145,14 +161,16 @@ def open_camera(self: 'FTCameraController.Helper', index: int, self.camera.queue = queue self.camera.open() - def open_tracker(self: 'FTCameraController.Helper') -> None: + def open_tracker(self: "FTCameraController.Helper") -> None: """Open tracker.""" - if platform.system() == 'Linux': + if platform.system() == "Linux": self.tracker = ViveTracker(self.camera.device.fileno()) else: - self.tracker = ViveTracker(self.camera.device, self.camera.device_index) + self.tracker = ViveTracker( + self.camera.device, self.camera.device_index + ) - def close(self: 'FTCameraController.Helper') -> None: + def close(self: "FTCameraController.Helper") -> None: """Close tracker and camera.""" if self.tracker is not None: self.tracker.dispose() @@ -170,27 +188,31 @@ def process(self, frame) -> None: frame = cv2.merge((channel, channel, channel)) if self.tracker is not None: frame = self.tracker.process_frame(frame) - self._queue.put(pack('HHH', *frame.shape) + frame.tobytes()) + self._queue.put(pack("HHH", *frame.shape) + frame.tobytes()) helper: Helper = Helper(queue) try: FTCameraController._logger.info( - "FTCameraController._read_process: open device") + "FTCameraController._read_process: open device" + ) helper.open_camera(self._index, queue) if not ViveTracker.is_camera_vive_tracker(helper.camera.device): FTCameraController._logger.exception( - "FTCameraController._read_process: not a VIVE Facial Tracker") + "FTCameraController._read_process: not a VIVE Facial Tracker" + ) raise RuntimeError("not a VIVE Facial Tracker") helper.open_tracker() FTCameraController._logger.info( - "FTCameraController._read_process: start reading") + "FTCameraController._read_process: start reading" + ) helper.camera.read() except Exception: FTCameraController._logger.exception( - "FTCameraController._read_process: failed open device") + "FTCameraController._read_process: failed open device" + ) print(traceback.format_exc()) finally: helper.close() diff --git a/BabbleApp/vivefacialtracker/vivetracker.py b/BabbleApp/vivefacialtracker/vivetracker.py index 9a58f0b..7c86b3e 100644 --- a/BabbleApp/vivefacialtracker/vivetracker.py +++ b/BabbleApp/vivefacialtracker/vivetracker.py @@ -34,7 +34,7 @@ import numpy as np from utils.misc_utils import os_type -if os_type == 'Linux': +if os_type == "Linux": import fcntl _IOC_NRBITS = 8 @@ -64,7 +64,7 @@ def _IOC_TYPECHECK(t): def _IOWR(type_, nr, size): return _IOC(_IOC_READ | _IOC_WRITE, type_, nr, _IOC_TYPECHECK(size)) -elif os_type == 'Windows': +elif os_type == "Windows": import pygrabber.dshow_graph as pgdsg import comtypes as comt import ctypes.wintypes as ctwt @@ -86,166 +86,196 @@ def _IOWR(type_, nr, size): IUnknown = comt.IUnknown HRESULT = ctypes.HRESULT - KSNODETYPE_DEV_SPECIFIC = GUID('{941C7AC0-C559-11D0-8A2B-00A0C9255AC1}') - GUID_EXT_CTRL_UNIT = GUID('{2ccb0bda-6331-4fdb-850e-79054dbd5671}') + KSNODETYPE_DEV_SPECIFIC = GUID("{941C7AC0-C559-11D0-8A2B-00A0C9255AC1}") + GUID_EXT_CTRL_UNIT = GUID("{2ccb0bda-6331-4fdb-850e-79054dbd5671}") class KSPROPERTY(Structure): - _fields_ = [ - ('Set', GUID), - ('Id', c_ulong), - ('Flags', c_ulong) - ] + _fields_ = [("Set", GUID), ("Id", c_ulong), ("Flags", c_ulong)] class KSP_NODE(Structure): _fields_ = [ - ('Property', KSPROPERTY), - ('NodeId', ctypes.c_ulong), - ('Reserved', ctypes.c_ulong) + ("Property", KSPROPERTY), + ("NodeId", ctypes.c_ulong), + ("Reserved", ctypes.c_ulong), ] class KSTOPOLOGY_CONNECTION(Structure): _fields_ = [ - ('FromNode', c_ulong), - ('FromNodePin', c_ulong), - ('ToNode', c_ulong), - ('ToNodePin', c_ulong) + ("FromNode", c_ulong), + ("FromNodePin", c_ulong), + ("ToNode", c_ulong), + ("ToNodePin", c_ulong), ] class IExtensionUnit(IUnknown): _case_insensitive_ = True - 'IExtensionUnit Interface' + "IExtensionUnit Interface" _iid_ = GUID_EXT_CTRL_UNIT _idlflags_ = [] _methods_ = [ COMMETHOD( - [], HRESULT, 'get_InfoSize', - (['out'], POINTER(c_ulong), 'pulSize')), + [], HRESULT, "get_InfoSize", (["out"], POINTER(c_ulong), "pulSize") + ), COMMETHOD( - [], HRESULT, 'get_Info', - (['in'], c_ulong, 'ulSize'), - (['in', 'out'], POINTER(c_uint8), 'pInfo')), + [], + HRESULT, + "get_Info", + (["in"], c_ulong, "ulSize"), + (["in", "out"], POINTER(c_uint8), "pInfo"), + ), COMMETHOD( - [], HRESULT, 'get_PropertySize', - (['in'], c_ulong, 'PropertyId'), - (['out'], POINTER(c_ulong), 'pulSize')), + [], + HRESULT, + "get_PropertySize", + (["in"], c_ulong, "PropertyId"), + (["out"], POINTER(c_ulong), "pulSize"), + ), COMMETHOD( - [], HRESULT, 'get_Property', - (['in'], c_ulong, 'PropertyId'), - (['in'], c_ulong, 'ulSize'), - (['in', 'out'], POINTER(c_uint8), 'pValue')), + [], + HRESULT, + "get_Property", + (["in"], c_ulong, "PropertyId"), + (["in"], c_ulong, "ulSize"), + (["in", "out"], POINTER(c_uint8), "pValue"), + ), COMMETHOD( - [], HRESULT, 'put_Property', - (['in'], c_ulong, 'PropertyId'), - (['in'], c_ulong, 'ulSize'), - (['in', 'out'], POINTER(c_uint8), 'pValue')), + [], + HRESULT, + "put_Property", + (["in"], c_ulong, "PropertyId"), + (["in"], c_ulong, "ulSize"), + (["in", "out"], POINTER(c_uint8), "pValue"), + ), COMMETHOD( - [], HRESULT, 'get_PropertyRange', - (['in'], c_ulong, 'PropertyId'), - (['in'], c_ulong, 'ulSize'), - (['in', 'out'], POINTER(c_uint8), 'pMin'), - (['in', 'out'], POINTER(c_uint8), 'pMax'), - (['in', 'out'], POINTER(c_uint8), 'pSteppingDelta'), - (['in', 'out'], POINTER(c_uint8), 'pDefault')) - ] + [], + HRESULT, + "get_PropertyRange", + (["in"], c_ulong, "PropertyId"), + (["in"], c_ulong, "ulSize"), + (["in", "out"], POINTER(c_uint8), "pMin"), + (["in", "out"], POINTER(c_uint8), "pMax"), + (["in", "out"], POINTER(c_uint8), "pSteppingDelta"), + (["in", "out"], POINTER(c_uint8), "pDefault"), + ), + ] class IKsTopologyInfo(IUnknown): _case_insensitive_ = True - 'IKsTopologyInfo Interface' - _iid_ = GUID('{720D4AC0-7533-11D0-A5D6-28DB04C10000}') + "IKsTopologyInfo Interface" + _iid_ = GUID("{720D4AC0-7533-11D0-A5D6-28DB04C10000}") _idlflags_ = [] _methods_ = [ COMMETHOD( - [], HRESULT, 'get_NumCategories', - (['out'], POINTER(DWORD), 'pdwNumCategories')), + [], + HRESULT, + "get_NumCategories", + (["out"], POINTER(DWORD), "pdwNumCategories"), + ), COMMETHOD( - [], HRESULT, 'get_Category', - (['in'], DWORD, 'dwIndex'), - (['out'], POINTER(GUID), 'pCategory')), + [], + HRESULT, + "get_Category", + (["in"], DWORD, "dwIndex"), + (["out"], POINTER(GUID), "pCategory"), + ), COMMETHOD( - [], HRESULT, 'get_NumConnections', - (['out'], POINTER(DWORD), 'pdwNumConnections')), + [], + HRESULT, + "get_NumConnections", + (["out"], POINTER(DWORD), "pdwNumConnections"), + ), COMMETHOD( - [], HRESULT, 'get_ConnectionInfo', - (['in'], DWORD, 'dwIndex'), - (['out'], POINTER(KSTOPOLOGY_CONNECTION), 'pConnectionInfo')), + [], + HRESULT, + "get_ConnectionInfo", + (["in"], DWORD, "dwIndex"), + (["out"], POINTER(KSTOPOLOGY_CONNECTION), "pConnectionInfo"), + ), COMMETHOD( - [], HRESULT, 'get_NodeName', - (['in'], DWORD, 'dwNodeId'), + [], + HRESULT, + "get_NodeName", + (["in"], DWORD, "dwNodeId"), # pwchNodeName is actually 'out' but not possible to # be declared like this in comtypes - (['in'], c_wchar_p, 'pwchNodeName'), - (['in'], DWORD, 'dwBufSize'), - (['out'], POINTER(DWORD), 'pdwNameLen')), + (["in"], c_wchar_p, "pwchNodeName"), + (["in"], DWORD, "dwBufSize"), + (["out"], POINTER(DWORD), "pdwNameLen"), + ), COMMETHOD( - [], HRESULT, 'get_NumNodes', - (['out'], POINTER(DWORD), 'pdwNumNodes')), + [], HRESULT, "get_NumNodes", (["out"], POINTER(DWORD), "pdwNumNodes") + ), COMMETHOD( - [], HRESULT, 'get_NodeType', - (['in'], DWORD, 'dwNodeId'), - (['out'], POINTER(GUID), 'pNodeType')), + [], + HRESULT, + "get_NodeType", + (["in"], DWORD, "dwNodeId"), + (["out"], POINTER(GUID), "pNodeType"), + ), COMMETHOD( - [], HRESULT, 'CreateNodeInstance', - (['in'], DWORD, 'dwNodeId'), - (['in'], REFIID, 'iid'), - (['out'], POINTER(POINTER(IUnknown)), 'ppvObject'))] + [], + HRESULT, + "CreateNodeInstance", + (["in"], DWORD, "dwNodeId"), + (["in"], REFIID, "iid"), + (["out"], POINTER(POINTER(IUnknown)), "ppvObject"), + ), + ] class KSPROPERTY(Structure): - _fields_ = [ - ('Set', GUID), - ('Id', c_ulong), - ('Flags', c_ulong) - ] + _fields_ = [("Set", GUID), ("Id", c_ulong), ("Flags", c_ulong)] class KSMETHOD(Structure): - _fields_ = [ - ('Set', GUID), - ('Id', c_ulong), - ('Flags', c_ulong) - ] + _fields_ = [("Set", GUID), ("Id", c_ulong), ("Flags", c_ulong)] class KSEVENT(Structure): - _fields_ = [ - ('Set', GUID), - ('Id', c_ulong), - ('Flags', c_ulong) - ] + _fields_ = [("Set", GUID), ("Id", c_ulong), ("Flags", c_ulong)] class KSP_NODE(Structure): _fields_ = [ - ('Property', KSPROPERTY), - ('NodeId', c_ulong), - ('Reserved', c_ulong) + ("Property", KSPROPERTY), + ("NodeId", c_ulong), + ("Reserved", c_ulong), ] class IKsControl(IUnknown): _case_insensitive_ = True - 'IKsControl Interface' - _iid_ = GUID('{28F54685-06FD-11D2-B27A-00A0C9223196}') + "IKsControl Interface" + _iid_ = GUID("{28F54685-06FD-11D2-B27A-00A0C9223196}") _idlflags_ = [] _methods_ = [ COMMETHOD( - [], HRESULT, 'KsProperty', + [], + HRESULT, + "KsProperty", # (['in'], POINTER(KSPROPERTY), 'Property'), - (['in'], POINTER(KSP_NODE), 'Property'), - (['in'], c_ulong, 'PropertyLength'), - (['in'], c_void_p, 'PropertyData'), - (['in'], c_ulong, 'DataLength'), - (['in'], POINTER(c_ulong), 'BytesReturned')), + (["in"], POINTER(KSP_NODE), "Property"), + (["in"], c_ulong, "PropertyLength"), + (["in"], c_void_p, "PropertyData"), + (["in"], c_ulong, "DataLength"), + (["in"], POINTER(c_ulong), "BytesReturned"), + ), COMMETHOD( - [], HRESULT, 'KsMethod', - (['in'], POINTER(KSMETHOD), 'Method'), - (['in'], c_ulong, 'MethodLength'), - (['in', 'out'], c_void_p, 'MethodData'), - (['in'], c_ulong, 'DataLength'), - (['out'], POINTER(c_ulong), 'BytesReturned')), + [], + HRESULT, + "KsMethod", + (["in"], POINTER(KSMETHOD), "Method"), + (["in"], c_ulong, "MethodLength"), + (["in", "out"], c_void_p, "MethodData"), + (["in"], c_ulong, "DataLength"), + (["out"], POINTER(c_ulong), "BytesReturned"), + ), COMMETHOD( - [], HRESULT, 'KsEvent', - (['in'], POINTER(KSEVENT), 'Event'), - (['in'], c_ulong, 'EventLength'), - (['in', 'out'], c_void_p, 'EventData'), - (['in'], c_ulong, 'DataLength'), - (['out'], POINTER(c_ulong), 'BytesReturned')) - ] + [], + HRESULT, + "KsEvent", + (["in"], POINTER(KSEVENT), "Event"), + (["in"], c_ulong, "EventLength"), + (["in", "out"], c_void_p, "EventData"), + (["in"], c_ulong, "DataLength"), + (["out"], POINTER(c_ulong), "BytesReturned"), + ), + ] def _find_extension_node(topo: IKsTopologyInfo, guid: GUID) -> int | None: count = topo.get_NumNodes() @@ -264,12 +294,18 @@ def _list_all_nodes(topo: IKsTopologyInfo) -> None: KSPROPERTY_TYPE_SET = 0x2 KSPROPERTY_TYPE_TOPOLOGY = 0x10000000 - def _control_propery_request(control: IKsControl, index: int, - node: int, data: list[c_uint8]) -> int: + def _control_propery_request( + control: IKsControl, index: int, node: int, data: list[c_uint8] + ) -> int: extprop = KSP_NODE( - KSPROPERTY(GUID_EXT_CTRL_UNIT, index, - KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY), - node, 0) + KSPROPERTY( + GUID_EXT_CTRL_UNIT, + index, + KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY, + ), + node, + 0, + ) bytes_returned = ctypes.c_ulong(0) """ print(["{}={}".format(n, getattr(extprop, n)) @@ -281,21 +317,25 @@ def _control_propery_request(control: IKsControl, index: int, index, node, len(data), _control_propery_request_len( control, index, node))) """ - control.KsProperty(extprop, ctypes.sizeof(extprop), - data, len(data), bytes_returned) + control.KsProperty( + extprop, ctypes.sizeof(extprop), data, len(data), bytes_returned + ) # print("GetControlProperty: rec {}".format(bytes_returned.value)) return bytes_returned.value - def _control_propery_request_len(control: IKsControl, index: int, - node: int) -> int: + def _control_propery_request_len(control: IKsControl, index: int, node: int) -> int: extprop = KSP_NODE( - KSPROPERTY(GUID_EXT_CTRL_UNIT, index, - KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY), - node, 0) + KSPROPERTY( + GUID_EXT_CTRL_UNIT, + index, + KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY, + ), + node, + 0, + ) bytes_returned = ctypes.c_ulong(0) try: - control.KsProperty(extprop, ctypes.sizeof(extprop), - None, 0, bytes_returned) + control.KsProperty(extprop, ctypes.sizeof(extprop), None, 0, bytes_returned) except comt.COMError as e: if e.hresult == -2147024662: # more data available return bytes_returned.value @@ -303,13 +343,13 @@ def _control_propery_request_len(control: IKsControl, index: int, class KSPROPXUINFO(Structure): _fields_ = [ - ('Length', c_uint8), - ('DescriptorType', c_uint8), - ('DescriptorSubtype', c_uint8), - ('bUnitID', c_uint8), - ('guidExtensionCode', GUID), - ('bNumControls', c_uint8), - ('bNrInPins', c_uint8) + ("Length", c_uint8), + ("DescriptorType", c_uint8), + ("DescriptorSubtype", c_uint8), + ("bUnitID", c_uint8), + ("guidExtensionCode", GUID), + ("bNumControls", c_uint8), + ("bNrInPins", c_uint8), # ('baSourceID', c_uint8 * 64) ] @@ -320,11 +360,12 @@ class KSPROPXUINFO(Structure): class ViveTracker: """Provides support to activate data steam on VIVE Facial Tracker camera.""" + _XU_TASK_SET = 0x50 _XU_TASK_GET = 0x51 - _XU_REG_SENSOR = 0xab + _XU_REG_SENSOR = 0xAB - if os_type == 'Linux': + if os_type == "Linux": _UVC_SET_CUR = 0x01 _UVC_GET_CUR = 0x81 _UVC_GET_MIN = 0x82 @@ -336,19 +377,20 @@ class ViveTracker: class _uvc_xu_control_query(ctypes.Structure): _fields_ = [ - ('unit', ctypes.c_uint8), - ('selector', ctypes.c_uint8), - ('query', ctypes.c_uint8), - ('size', ctypes.c_uint16), - ('data', ctypes.POINTER(ctypes.c_uint8)), + ("unit", ctypes.c_uint8), + ("selector", ctypes.c_uint8), + ("query", ctypes.c_uint8), + ("size", ctypes.c_uint16), + ("data", ctypes.POINTER(ctypes.c_uint8)), ] - _UVCIOC_CTRL_QUERY = _IOWR('u', 0x21, _uvc_xu_control_query) + _UVCIOC_CTRL_QUERY = _IOWR("u", 0x21, _uvc_xu_control_query) _logger = logging.getLogger("evcta.ViveTracker") - if os_type == 'Linux': - def __init__(self: 'ViveTracker', fd: int) -> None: + if os_type == "Linux": + + def __init__(self: "ViveTracker", fd: int) -> None: """Create VIVE Face Tracker instance. Constructor tries first to detect if this is a VIVE Face Tracker. @@ -368,9 +410,10 @@ def __init__(self: 'ViveTracker', fd: int) -> None: raise Exception("Missing camera file descriptor") self._fd: int = fd self._init_common() - elif os_type == 'Windows': - def __init__(self: 'ViveTracker', device: pgdsg.VideoInput, - index: int) -> None: + + elif os_type == "Windows": + + def __init__(self: "ViveTracker", device: pgdsg.VideoInput, index: int) -> None: """Create VIVE Face Tracker instance. Constructor tries first to detect if this is a VIVE Face Tracker. @@ -395,7 +438,7 @@ def __init__(self: 'ViveTracker', device: pgdsg.VideoInput, except Exception: self.dispose() - def _init_common(self: 'ViveTracker') -> None: + def _init_common(self: "ViveTracker") -> None: self._dataBufLen = 384 self._resize_data_buf() self._bufferRegister: list[ctypes.c_uint8] = (ctypes.c_uint8 * 17)() @@ -405,9 +448,11 @@ def _init_common(self: 'ViveTracker') -> None: self._detect_vive_tracker() self._activate_tracker() - def _resize_data_buf(self: 'ViveTracker') -> None: + def _resize_data_buf(self: "ViveTracker") -> None: self._bufferSend: list[ctypes.c_uint8] = (ctypes.c_uint8 * self._dataBufLen)() - self._bufferReceive: list[ctypes.c_uint8] = (ctypes.c_uint8 * self._dataBufLen)() + self._bufferReceive: list[ctypes.c_uint8] = ( + ctypes.c_uint8 * self._dataBufLen + )() self._dataTest: list[ctypes.c_uint8] = (ctypes.c_uint8 * self._dataBufLen)() self._dataTest[0] = 0x51 @@ -416,9 +461,10 @@ def _resize_data_buf(self: 'ViveTracker') -> None: self._dataTest[254] = 0x53 self._dataTest[255] = 0x54 - if os_type == 'Linux': + if os_type == "Linux": + @staticmethod - def is_camera_vive_tracker(device: 'v4l.Device') -> bool: + def is_camera_vive_tracker(device: "v4l.Device") -> bool: """Detect if this is a VIVE Face Tracker. This is done right now by looking at the human readable device @@ -428,15 +474,19 @@ def is_camera_vive_tracker(device: 'v4l.Device') -> bool: the reader as excercise. """ check = "HTC Multimedia Camera" in device.info.card - ViveTracker._logger.info("is_camera_vive_tracker: '{}' -> {}". - format(device.info.card, check)) + ViveTracker._logger.info( + "is_camera_vive_tracker: '{}' -> {}".format(device.info.card, check) + ) return check - elif os_type == 'Windows': + + elif os_type == "Windows": + @staticmethod - def is_camera_vive_tracker(device: 'pgdsg.VideoInput') -> bool: + def is_camera_vive_tracker(device: "pgdsg.VideoInput") -> bool: check = "HTC Multimedia Camera" in device.Name - ViveTracker._logger.info("is_camera_vive_tracker: '{}' -> {}". - format(device.Name, check)) + ViveTracker._logger.info( + "is_camera_vive_tracker: '{}' -> {}".format(device.Name, check) + ) return check @staticmethod @@ -451,7 +501,7 @@ def is_device_vive_tracker(device_name: str) -> bool: bool: True if the device is identified as a Vive Face Tracker, False otherwise. """ - if os_type == 'Linux': + if os_type == "Linux": try: # Ensure the provided device name exists if not os.path.exists(device_name): @@ -473,20 +523,20 @@ def is_device_vive_tracker(device_name: str) -> bool: return False else: return "HTC Multimedia Camera" in str(device_name) - - def dispose(self: 'ViveTracker') -> None: + + def dispose(self: "ViveTracker") -> None: """Dispose of tracker. Deactivates data stream.""" ViveTracker._logger.info("dispose vive tracker") - if os_type == 'Linux': + if os_type == "Linux": self._deactivate_tracker() - elif os_type == 'Windows': + elif os_type == "Windows": self._deactivate_tracker() self._close_controller() - def process_frame(self: 'ViveTracker', data: np.ndarray) -> np.ndarray: + def process_frame(self: "ViveTracker", data: np.ndarray) -> np.ndarray: """Process a captured frame. Right now this applies a median blur but other manipulations @@ -515,8 +565,9 @@ def process_frame(self: 'ViveTracker', data: np.ndarray) -> np.ndarray: return cv.merge((lum, lum, lum)) - if os_type == 'Windows': - def _open_controller(self: 'ViveTracker') -> None: + if os_type == "Windows": + + def _open_controller(self: "ViveTracker") -> None: if self._xu_control: return @@ -524,7 +575,8 @@ def _open_controller(self: 'ViveTracker') -> None: sdenum = pgdsg.SystemDeviceEnum().system_device_enum filenum = sdenum.CreateClassEnumerator( - comt.GUID(pgdsg.DeviceCategories.VideoInputDevice), dwFlags=0) + comt.GUID(pgdsg.DeviceCategories.VideoInputDevice), dwFlags=0 + ) moniker, count = filenum.Next(1) i = 0 while i != self._device_index and count > 0: @@ -549,11 +601,11 @@ def _open_controller(self: 'ViveTracker') -> None: topo = self._device.instance.QueryInterface(IKsTopologyInfo) # _list_all_nodes(topo) - self._xu_node_index = _find_extension_node( - topo, KSNODETYPE_DEV_SPECIFIC) + self._xu_node_index = _find_extension_node(topo, KSNODETYPE_DEV_SPECIFIC) xu_node: IUnknown = topo.CreateNodeInstance( - self._xu_node_index, IUnknown._iid_) + self._xu_node_index, IUnknown._iid_ + ) self._xu_control = xu_node.QueryInterface(IKsControl) @@ -568,42 +620,51 @@ def _open_controller(self: 'ViveTracker') -> None: for n, t in xupi._fields_]) """ - def _close_controller(self: 'ViveTracker') -> None: + def _close_controller(self: "ViveTracker") -> None: self._xu_control = None - def _xu_get_len(self: 'ViveTracker', selector: int) -> int: + def _xu_get_len(self: "ViveTracker", selector: int) -> int: """Send GET_LEN command to device extension unit. Keyword arguments: selector --- Selector """ - if os_type == 'Linux': + if os_type == "Linux": length = (ctypes.c_uint8 * 2)(0, 0) c = ViveTracker._uvc_xu_control_query( - 4, selector, ViveTracker._UVC_GET_LEN, 2, length) + 4, selector, ViveTracker._UVC_GET_LEN, 2, length + ) fcntl.ioctl(self._fd, ViveTracker._UVCIOC_CTRL_QUERY, c) return (length[1] << 8) + length[0] - elif os_type == 'Windows': + elif os_type == "Windows": return _control_propery_request_len( - self._xu_control, 2, self._xu_node_index) + self._xu_control, 2, self._xu_node_index + ) - def _xu_get_cur(self: 'ViveTracker', selector: int, - data: list[ctypes.c_uint8]) -> None: + def _xu_get_cur( + self: "ViveTracker", selector: int, data: list[ctypes.c_uint8] + ) -> None: """Send GET_CUR command to device extension unit. Keyword arguments: selector --- Selector data -- Buffer to store response to. Has to be 384 bytes long. """ - if os_type == 'Linux': + if os_type == "Linux": c = ViveTracker._uvc_xu_control_query( - 4, selector, ViveTracker._UVC_GET_CUR, len(data), data) + 4, selector, ViveTracker._UVC_GET_CUR, len(data), data + ) fcntl.ioctl(self._fd, ViveTracker._UVCIOC_CTRL_QUERY, c) - elif os_type == 'Windows': + elif os_type == "Windows": xuprop = KSP_NODE( - KSPROPERTY(GUID_EXT_CTRL_UNIT, selector, - KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY), - self._xu_node_index, 0) + KSPROPERTY( + GUID_EXT_CTRL_UNIT, + selector, + KSPROPERTY_TYPE_GET | KSPROPERTY_TYPE_TOPOLOGY, + ), + self._xu_node_index, + 0, + ) received = ctypes.c_ulong(0) """ print(["{}={}".format(n, getattr(xuprop, n)) @@ -615,27 +676,35 @@ def _xu_get_cur(self: 'ViveTracker', selector: int, _control_propery_request_len( self._xu_control, selector, self._xu_node_index))) """ - self._xu_control.KsProperty(xuprop, ctypes.sizeof(xuprop), - data, len(data), received) + self._xu_control.KsProperty( + xuprop, ctypes.sizeof(xuprop), data, len(data), received + ) # print("received: {}".format(received.value)) - def _xu_set_cur(self: 'ViveTracker', selector: int, - data: list[ctypes.c_uint8]) -> None: + def _xu_set_cur( + self: "ViveTracker", selector: int, data: list[ctypes.c_uint8] + ) -> None: """Send SET_CUR command to device extension unit. Keyword arguments: selector --- Selector data -- Data to send. Has to be 384 bytes long. """ - if os_type == 'Linux': + if os_type == "Linux": c = ViveTracker._uvc_xu_control_query( - 4, selector, ViveTracker._UVC_SET_CUR, len(data), data) + 4, selector, ViveTracker._UVC_SET_CUR, len(data), data + ) fcntl.ioctl(self._fd, ViveTracker._UVCIOC_CTRL_QUERY, c) - elif os_type == 'Windows': + elif os_type == "Windows": xuprop = KSP_NODE( - KSPROPERTY(GUID_EXT_CTRL_UNIT, selector, - KSPROPERTY_TYPE_SET | KSPROPERTY_TYPE_TOPOLOGY), - self._xu_node_index, 0) + KSPROPERTY( + GUID_EXT_CTRL_UNIT, + selector, + KSPROPERTY_TYPE_SET | KSPROPERTY_TYPE_TOPOLOGY, + ), + self._xu_node_index, + 0, + ) received = ctypes.c_ulong(0) """ print(["{}={}".format(n, getattr(xuprop, n)) @@ -647,15 +716,17 @@ def _xu_set_cur(self: 'ViveTracker', selector: int, _control_propery_request_len( self._xu_control, selector, self._xu_node_index))) """ - self._xu_control.KsProperty(xuprop, ctypes.sizeof(xuprop), - data, len(data), received) + self._xu_control.KsProperty( + xuprop, ctypes.sizeof(xuprop), data, len(data), received + ) - def _get_len(self: 'ViveTracker') -> int: + def _get_len(self: "ViveTracker") -> int: """Get buffer length of device.""" return self._xu_get_len(2) - def _set_cur(self: 'ViveTracker', command: list[ctypes.c_uint8], - timeout: float = 0.5) -> None: + def _set_cur( + self: "ViveTracker", command: list[ctypes.c_uint8], timeout: float = 0.5 + ) -> None: """Send SET_CUR command to device extension unit with proper handling. Sends SET_CUR command to the device. Then sends GET_CUR commands to @@ -669,8 +740,9 @@ def _set_cur(self: 'ViveTracker', command: list[ctypes.c_uint8], self._bufferSend[:length] = command self._xu_set_cur(2, self._bufferSend) if self._debug: - ViveTracker._logger.debug("set_cur({})".format( - [hex(x) for x in command[:16]])) + ViveTracker._logger.debug( + "set_cur({})".format([hex(x) for x in command[:16]]) + ) lenbuf = len(self._bufferReceive) stime = timer() while True: @@ -689,37 +761,50 @@ def _set_cur(self: 'ViveTracker', command: list[ctypes.c_uint8], return # command finished else: raise Exception( - "set_cur({}): response not matching command". - format([hex(x) for x in command[:16]])) + "set_cur({}): response not matching command".format( + [hex(x) for x in command[:16]] + ) + ) else: - raise Exception("set_cur({}): invalid response: {}".format( - [hex(x) for x in command[:16]], - [hex(x) for x in self._bufferReceive[:16]])) + raise Exception( + "set_cur({}): invalid response: {}".format( + [hex(x) for x in command[:16]], + [hex(x) for x in self._bufferReceive[:16]], + ) + ) - elapsed = (timer() - stime) + elapsed = timer() - stime if self._debug: - ViveTracker._logger.debug("-> elasped {:d}ms".format( - int(elapsed * 1000))) + ViveTracker._logger.debug( + "-> elasped {:d}ms".format(int(elapsed * 1000)) + ) if elapsed > timeout: - raise Exception("set_cur({}): timeout".format( - [hex(x) for x in command[:16]])) + raise Exception( + "set_cur({}): timeout".format([hex(x) for x in command[:16]]) + ) - def _set_cur_no_resp(self: 'ViveTracker', - command: list[ctypes.c_uint8]) -> None: + def _set_cur_no_resp(self: "ViveTracker", command: list[ctypes.c_uint8]) -> None: """Send SET_CUR command to device without proper handling. Keyword arguments: command --- Command to send. """ - self._bufferSend[:len(command)] = command + self._bufferSend[: len(command)] = command self._xu_set_cur(2, self._bufferSend) if self._debug: - ViveTracker._logger.debug("set_cur_no_resp({})".format( - [hex(x) for x in command[:16]])) - - def _init_register(self: 'ViveTracker', command: int, reg: int, - address: int, address_len: int, - value: int, value_len: int) -> None: + ViveTracker._logger.debug( + "set_cur_no_resp({})".format([hex(x) for x in command[:16]]) + ) + + def _init_register( + self: "ViveTracker", + command: int, + reg: int, + address: int, + address_len: int, + value: int, + value_len: int, + ) -> None: """Init buffer for manipulating a register. Keyword arguments: @@ -738,10 +823,10 @@ def _init_register(self: 'ViveTracker', command: int, reg: int, br[4] = ctypes.c_uint8(value_len) # data width in bytes # address - br[5] = ctypes.c_uint8((address > 24) & 0xff) - br[6] = ctypes.c_uint8((address > 16) & 0xff) - br[7] = ctypes.c_uint8((address > 8) & 0xff) - br[8] = ctypes.c_uint8(address & 0xff) + br[5] = ctypes.c_uint8((address > 24) & 0xFF) + br[6] = ctypes.c_uint8((address > 16) & 0xFF) + br[7] = ctypes.c_uint8((address > 8) & 0xFF) + br[8] = ctypes.c_uint8(address & 0xFF) # page address br[9] = ctypes.c_uint8(0x90) @@ -750,13 +835,14 @@ def _init_register(self: 'ViveTracker', command: int, reg: int, br[12] = ctypes.c_uint8(0x01) # value - br[13] = ctypes.c_uint8((value > 24) & 0xff) - br[14] = ctypes.c_uint8((value > 16) & 0xff) - br[15] = ctypes.c_uint8((value > 8) & 0xff) - br[16] = ctypes.c_uint8(value & 0xff) - - def _set_register(self: 'ViveTracker', reg: int, address: int, - value: int, timeout: float = 0.5) -> None: + br[13] = ctypes.c_uint8((value > 24) & 0xFF) + br[14] = ctypes.c_uint8((value > 16) & 0xFF) + br[15] = ctypes.c_uint8((value > 8) & 0xFF) + br[16] = ctypes.c_uint8(value & 0xFF) + + def _set_register( + self: "ViveTracker", reg: int, address: int, value: int, timeout: float = 0.5 + ) -> None: """Set device register. Keyword arguments: @@ -772,8 +858,9 @@ def _set_register(self: 'ViveTracker', reg: int, address: int, else: self._set_cur_no_resp(self._bufferRegister) - def _get_register(self: 'ViveTracker', reg: int, address: int, - timeout: float = 0.5) -> int: + def _get_register( + self: "ViveTracker", reg: int, address: int, timeout: float = 0.5 + ) -> int: """Get device register. Keyword arguments: @@ -785,8 +872,9 @@ def _get_register(self: 'ViveTracker', reg: int, address: int, self._set_cur(self._bufferRegister, timeout) return int(self._bufferReceive[17]) - def _set_register_sensor(self: 'ViveTracker', address: int, value: int, - timeout: float = 0.5) -> None: + def _set_register_sensor( + self: "ViveTracker", address: int, value: int, timeout: float = 0.5 + ) -> None: """Set device sensor register. Keyword arguments: @@ -797,8 +885,9 @@ def _set_register_sensor(self: 'ViveTracker', address: int, value: int, """ self._set_register(ViveTracker._XU_REG_SENSOR, address, value, timeout) - def _get_register_sensor(self: 'ViveTracker', address: int, - timeout: float = 0.5) -> int: + def _get_register_sensor( + self: "ViveTracker", address: int, timeout: float = 0.5 + ) -> int: """Get device sensor register. Keyword arguments: @@ -807,39 +896,40 @@ def _get_register_sensor(self: 'ViveTracker', address: int, """ return self._get_register(ViveTracker._XU_REG_SENSOR, address, timeout) - def _set_enable_stream(self: 'ViveTracker', enable: bool) -> None: + def _set_enable_stream(self: "ViveTracker", enable: bool) -> None: """Enable or disable data stream. Keyword arguments: enable --- Enable or disable data stream. """ - buf = (ctypes.c_uint8 * 4)(ViveTracker._XU_TASK_SET, 0x14, 0x00, - 0x01 if enable else 0x00) + buf = (ctypes.c_uint8 * 4)( + ViveTracker._XU_TASK_SET, 0x14, 0x00, 0x01 if enable else 0x00 + ) self._set_cur_no_resp(buf) - def _detect_vive_tracker(self: 'ViveTracker') -> None: + def _detect_vive_tracker(self: "ViveTracker") -> None: """Try to detect if this is a VIVE Face Tracker device. uses GET_LEN to get the data buffer length. VIVE Face Tracker uses 384. If this is not the case then this is most probebly something else but not a VIVE Face Tracker. """ - if os_type == 'Linux': + if os_type == "Linux": length = self._get_len() - elif os_type == 'Windows': + elif os_type == "Windows": length = _control_propery_request_len( - self._xu_control, 2, self._xu_node_index) + self._xu_control, 2, self._xu_node_index + ) if length == 384: pass elif length == 64: self._dataBufLen = 64 self._resize_data_buf() else: - raise Exception("length check failed: {} instead of 384/64". - format(length)) + raise Exception("length check failed: {} instead of 384/64".format(length)) ViveTracker._logger.info("vive tracker detected") - def _activate_tracker(self: 'ViveTracker') -> None: + def _activate_tracker(self: "ViveTracker") -> None: """Activate tracker. Sets parameters and enables data stream.""" @@ -855,21 +945,21 @@ def _activate_tracker(self: 'ViveTracker') -> None: self._set_register_sensor(0x00, 0x40) self._set_register_sensor(0x08, 0x01) self._set_register_sensor(0x70, 0x00) - self._set_register_sensor(0x02, 0xff) - self._set_register_sensor(0x03, 0xff) - self._set_register_sensor(0x04, 0xff) - self._set_register_sensor(0x0e, 0x00) - self._set_register_sensor(0x05, 0xb2) - self._set_register_sensor(0x06, 0xb2) - self._set_register_sensor(0x07, 0xb2) - self._set_register_sensor(0x0f, 0x03) + self._set_register_sensor(0x02, 0xFF) + self._set_register_sensor(0x03, 0xFF) + self._set_register_sensor(0x04, 0xFF) + self._set_register_sensor(0x0E, 0x00) + self._set_register_sensor(0x05, 0xB2) + self._set_register_sensor(0x06, 0xB2) + self._set_register_sensor(0x07, 0xB2) + self._set_register_sensor(0x0F, 0x03) ViveTracker._logger.info("-> enable stream") self._set_cur(self._dataTest) self._set_enable_stream(True) time.sleep(0.25) - def _deactivate_tracker(self: 'ViveTracker') -> None: + def _deactivate_tracker(self: "ViveTracker") -> None: """Deactivate tracker. Disables data stream.