From a3b9f77e8f94d5d42f6876a4e0c16298939fd648 Mon Sep 17 00:00:00 2001 From: Kristoffer Richardsson Date: Fri, 2 Dec 2022 10:11:41 +0100 Subject: [PATCH 1/3] Refactoring of mem sub system --- cflib/crazyflie/mem/__init__.py | 394 +++++++++++++++----------------- 1 file changed, 189 insertions(+), 205 deletions(-) diff --git a/cflib/crazyflie/mem/__init__.py b/cflib/crazyflie/mem/__init__.py index 9dcd65753..d038a6b29 100644 --- a/cflib/crazyflie/mem/__init__.py +++ b/cflib/crazyflie/mem/__init__.py @@ -333,12 +333,10 @@ def write(self, memory, addr, data, flush_queue=False, progress_cb=None): def read(self, memory, addr, length): """ - Read the specified amount of bytes from the given memory at the given - address + Read the specified amount of bytes from the given memory at the given address """ if memory.id in self._read_requests: - logger.warning('There is already a read operation ongoing for ' - 'memory id {}'.format(memory.id)) + logger.warning('There is already a read operation ongoing for memory id {}'.format(memory.id)) return False rreq = _ReadRequest(memory, addr, length, self.cf) @@ -357,8 +355,7 @@ def refresh(self, refresh_done_callback): self.mem_read_cb.remove_callback(m.new_data) m.disconnect() except Exception as e: - logger.info( - 'Error when removing memory after update: {}'.format(e)) + logger.info('Error when removing memory after update: {}'.format(e)) self.mems = [] self.nbr_of_mems = 0 @@ -381,209 +378,196 @@ def _new_packet_cb(self, packet): payload = packet.data[1:] if chan == CHAN_INFO: - if cmd == CMD_INFO_NBR: - self.nbr_of_mems = payload[0] - logger.info('{} memories found'.format(self.nbr_of_mems)) - - # Start requesting information about the memories, - # if there are any... - if self.nbr_of_mems > 0: - if not self._getting_count: - self._getting_count = True - logger.debug('Requesting first id') - pk = CRTPPacket() - pk.set_header(CRTPPort.MEM, CHAN_INFO) - pk.data = (CMD_INFO_DETAILS, 0) - self.cf.send_packet(pk, expected_reply=( - CMD_INFO_DETAILS, 0)) - else: - self._refresh_callback() - - if cmd == CMD_INFO_DETAILS: - - # Did we get a good reply, otherwise try again: - if len(payload) < 5: - # Workaround for 1-wire bug when memory is detected - # but updating the info crashes the communication with - # the 1-wire. Fail by saying we only found 1 memory - # (the I2C). - logger.error( - '-------->Got good count, but no info on mem!') - self.nbr_of_mems = 1 - if self._refresh_callback: - self._refresh_callback() - self._refresh_callback = None - return - - # Create information about a new memory - # Id - 1 byte - mem_id = payload[0] - # Type - 1 byte - mem_type = payload[1] - # Size 4 bytes (as addr) - mem_size = struct.unpack('I', payload[2:6])[0] - # Addr (only valid for 1-wire?) - mem_addr_raw = struct.unpack('B' * 8, payload[6:14]) - mem_addr = '' - for m in mem_addr_raw: - mem_addr += '{:02X}'.format(m) - - if (not self.get_mem(mem_id)): - if mem_type == MemoryElement.TYPE_1W: - mem = OWElement(id=mem_id, type=mem_type, - size=mem_size, - addr=mem_addr, mem_handler=self) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_write_cb.add_callback(mem.write_done) - self._ow_mems_left_to_update.append(mem.id) - elif mem_type == MemoryElement.TYPE_I2C: - mem = I2CElement(id=mem_id, type=mem_type, - size=mem_size, - mem_handler=self) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_write_cb.add_callback(mem.write_done) - elif mem_type == MemoryElement.TYPE_DRIVER_LED: - mem = LEDDriverMemory(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_write_cb.add_callback(mem.write_done) - elif mem_type == MemoryElement.TYPE_LOCO: - mem = LocoMemory(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - elif mem_type == MemoryElement.TYPE_TRAJ: - mem = TrajectoryMemory(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_write_cb.add_callback(mem.write_done) - self.mem_write_failed_cb.add_callback(mem.write_failed) - elif mem_type == MemoryElement.TYPE_LOCO2: - mem = LocoMemory2(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - elif mem_type == MemoryElement.TYPE_LH: - mem = LighthouseMemory(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_read_failed_cb.add_callback( - mem.new_data_failed) - self.mem_write_cb.add_callback(mem.write_done) - self.mem_write_failed_cb.add_callback(mem.write_failed) - elif mem_type == MemoryElement.TYPE_MEMORY_TESTER: - mem = MemoryTester(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_write_cb.add_callback(mem.write_done) - elif mem_type == MemoryElement.TYPE_DRIVER_LEDTIMING: - mem = LEDTimingsDriverMemory(id=mem_id, type=mem_type, - size=mem_size, - mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem.new_data) - self.mem_write_cb.add_callback(mem.write_done) - elif mem_type == MemoryElement.TYPE_DECK_MEMORY: - mem = DeckMemoryManager(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) - logger.debug(mem) - self.mem_read_cb.add_callback(mem._new_data) - self.mem_read_failed_cb.add_callback(mem._new_data_failed) - self.mem_write_cb.add_callback(mem._write_done) - self.mem_write_failed_cb.add_callback(mem._write_failed) - else: - mem = MemoryElement(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) - logger.debug(mem) - self.mems.append(mem) - self.mem_added_cb.call(mem) - - self._fetch_id = mem_id + 1 - - if self.nbr_of_mems - 1 >= self._fetch_id: - logger.debug( - 'Requesting information about memory {}'.format( - self._fetch_id)) - pk = CRTPPacket() - pk.set_header(CRTPPort.MEM, CHAN_INFO) - pk.data = (CMD_INFO_DETAILS, self._fetch_id) - self.cf.send_packet(pk, expected_reply=( - CMD_INFO_DETAILS, self._fetch_id)) - else: - logger.debug( - 'Done getting all the memories, start reading the OWs') - ows = self.get_mems(MemoryElement.TYPE_1W) - # If there are any OW mems start reading them, otherwise - # we are done - for ow_mem in ows: - ow_mem.update(self._mem_update_done) - if len(ows) == 0: - if self._refresh_callback: - self._refresh_callback() - self._refresh_callback = None - + self._handle_chan_info(cmd, payload) if chan == CHAN_WRITE: - id = cmd - (addr, status) = struct.unpack(' 0: - self._write_requests[id][0].start() - else: - logger.debug( - 'Status {}: write failed.'.format(status)) - # Remove from queue + self._handle_chan_write(cmd, payload) + if chan == CHAN_READ: + self._handle_chan_read(cmd, payload) + + def _handle_chan_info(self, cmd, payload): + if cmd == CMD_INFO_NBR: + self._handle_cmd_info_nbr(payload) + if cmd == CMD_INFO_DETAILS: + self._handle_cmd_info_details(payload) + + def _handle_cmd_info_nbr(self, payload): + self.nbr_of_mems = payload[0] + logger.info('{} memories found'.format(self.nbr_of_mems)) + + # Start requesting information about the memories, + if self.nbr_of_mems > 0: + if not self._getting_count: + self._getting_count = True + logger.debug('Requesting first id') + pk = CRTPPacket() + pk.set_header(CRTPPort.MEM, CHAN_INFO) + pk.data = (CMD_INFO_DETAILS, 0) + self.cf.send_packet(pk, expected_reply=(CMD_INFO_DETAILS, 0)) + else: + self._refresh_callback() + + def _handle_cmd_info_details(self, payload): + # Did we get a good reply, otherwise try again: + if len(payload) < 5: + # Workaround for 1-wire bug when memory is detected + # but updating the info crashes the communication with + # the 1-wire. Fail by saying we only found 1 memory + # (the I2C). + logger.error('-------->Got good count, but no info on mem!') + self.nbr_of_mems = 1 + if self._refresh_callback: + self._refresh_callback() + self._refresh_callback = None + return + + # Create information about a new memory + # Id - 1 byte + mem_id = payload[0] + # Type - 1 byte + mem_type = payload[1] + # Size 4 bytes (as addr) + mem_size = struct.unpack('I', payload[2:6])[0] + # Addr (only valid for 1-wire?) + mem_addr_raw = struct.unpack('B' * 8, payload[6:14]) + mem_addr = '' + for m in mem_addr_raw: + mem_addr += '{:02X}'.format(m) + + if (not self.get_mem(mem_id)): + if mem_type == MemoryElement.TYPE_1W: + mem = OWElement(id=mem_id, type=mem_type, + size=mem_size, + addr=mem_addr, mem_handler=self) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_write_cb.add_callback(mem.write_done) + self._ow_mems_left_to_update.append(mem.id) + elif mem_type == MemoryElement.TYPE_I2C: + mem = I2CElement(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_write_cb.add_callback(mem.write_done) + elif mem_type == MemoryElement.TYPE_DRIVER_LED: + mem = LEDDriverMemory(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_write_cb.add_callback(mem.write_done) + elif mem_type == MemoryElement.TYPE_LOCO: + mem = LocoMemory(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + elif mem_type == MemoryElement.TYPE_TRAJ: + mem = TrajectoryMemory(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_write_cb.add_callback(mem.write_done) + self.mem_write_failed_cb.add_callback(mem.write_failed) + elif mem_type == MemoryElement.TYPE_LOCO2: + mem = LocoMemory2(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + elif mem_type == MemoryElement.TYPE_LH: + mem = LighthouseMemory(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_read_failed_cb.add_callback(mem.new_data_failed) + self.mem_write_cb.add_callback(mem.write_done) + self.mem_write_failed_cb.add_callback(mem.write_failed) + elif mem_type == MemoryElement.TYPE_MEMORY_TESTER: + mem = MemoryTester(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_write_cb.add_callback(mem.write_done) + elif mem_type == MemoryElement.TYPE_DRIVER_LEDTIMING: + mem = LEDTimingsDriverMemory(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem.new_data) + self.mem_write_cb.add_callback(mem.write_done) + elif mem_type == MemoryElement.TYPE_DECK_MEMORY: + mem = DeckMemoryManager(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) + logger.debug(mem) + self.mem_read_cb.add_callback(mem._new_data) + self.mem_read_failed_cb.add_callback(mem._new_data_failed) + self.mem_write_cb.add_callback(mem._write_done) + self.mem_write_failed_cb.add_callback(mem._write_failed) + else: + mem = MemoryElement(id=mem_id, type=mem_type, + size=mem_size, mem_handler=self) + logger.debug(mem) + self.mems.append(mem) + self.mem_added_cb.call(mem) + + self._fetch_id = mem_id + 1 + + if self.nbr_of_mems - 1 >= self._fetch_id: + logger.debug('Requesting information about memory {}'.format(self._fetch_id)) + pk = CRTPPacket() + pk.set_header(CRTPPort.MEM, CHAN_INFO) + pk.data = (CMD_INFO_DETAILS, self._fetch_id) + self.cf.send_packet(pk, expected_reply=(CMD_INFO_DETAILS, self._fetch_id)) + else: + logger.debug('Done getting all the memories, start reading the OWs') + ows = self.get_mems(MemoryElement.TYPE_1W) + # If there are any OW mems start reading them, otherwise + # we are done + for ow_mem in ows: + ow_mem.update(self._mem_update_done) + if len(ows) == 0: + if self._refresh_callback: + self._refresh_callback() + self._refresh_callback = None + + def _handle_chan_write(self, cmd, payload): + id = cmd + (addr, status) = struct.unpack(' 0: self._write_requests[id][0].start() - - self._write_requests_lock.release() - - # Call callbacks after the lock has been released to alow for new writes - # to be initiated from the callback. - if do_call_sucess_cb: - self.mem_write_cb.call(wreq.mem, wreq.addr) - if do_call_fail_cb: - self.mem_write_failed_cb.call(wreq.mem, wreq.addr) - - if chan == CHAN_READ: - id = cmd - (addr, status) = struct.unpack(' 0: + self._write_requests[id][0].start() + + self._write_requests_lock.release() + + # Call callbacks after the lock has been released to alow for new writes + # to be initiated from the callback. + if do_call_sucess_cb: + self.mem_write_cb.call(wreq.mem, wreq.addr) + if do_call_fail_cb: + self.mem_write_failed_cb.call(wreq.mem, wreq.addr) + + def _handle_chan_read(self, cmd, payload): + id = cmd + (addr, status) = struct.unpack(' Date: Fri, 2 Dec 2022 10:12:22 +0100 Subject: [PATCH 2/3] Fixed flake8 problem --- cflib/crtp/radiodriver.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index db9ed9088..f0bf50eb7 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -42,12 +42,10 @@ from threading import Semaphore from threading import Thread from typing import Any -from typing import Dict from typing import Iterable from typing import List from typing import Optional from typing import Tuple -from typing import Union from urllib.parse import parse_qs from urllib.parse import urlparse From 273099f5a1da447c68f8235b975da04ed7ccc996 Mon Sep 17 00:00:00 2001 From: Kristoffer Richardsson Date: Fri, 2 Dec 2022 13:08:04 +0100 Subject: [PATCH 3/3] Handle disconnects when reading/writing memory mappings --- cflib/crazyflie/mem/__init__.py | 50 +++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/cflib/crazyflie/mem/__init__.py b/cflib/crazyflie/mem/__init__.py index d038a6b29..b3767676b 100644 --- a/cflib/crazyflie/mem/__init__.py +++ b/cflib/crazyflie/mem/__init__.py @@ -256,6 +256,8 @@ def _clear_state(self): # Called when new memories have been added self.mem_added_cb = Caller() + self._clear_refresh_callbacks() + # Called to signal completion of read or write self.mem_read_cb = Caller() self.mem_read_failed_cb = Caller() @@ -263,6 +265,7 @@ def _clear_state(self): self.mem_write_failed_cb = Caller() self._refresh_callback = None + self._refresh_failed_callback = None self._fetch_id = 0 self.nbr_of_mems = 0 self._ow_mem_fetch_index = 0 @@ -272,6 +275,10 @@ def _clear_state(self): self._ow_mems_left_to_update = [] self._getting_count = False + def _clear_refresh_callbacks(self): + self._refresh_callback = None + self._refresh_failed_callback = None + def _mem_update_done(self, mem): """ Callback from each individual memory (only 1-wire) when reading of @@ -285,7 +292,7 @@ def _mem_update_done(self, mem): if len(self._ow_mems_left_to_update) == 0: if self._refresh_callback: self._refresh_callback() - self._refresh_callback = None + self._clear_refresh_callbacks() def get_mem(self, id): """Fetch the memory with the supplied id""" @@ -346,9 +353,10 @@ def read(self, memory, addr, length): return True - def refresh(self, refresh_done_callback): + def refresh(self, refresh_done_callback, refresh_failed_cb=None): """Start fetching all the detected memories""" self._refresh_callback = refresh_done_callback + self._refresh_failed_callback = refresh_failed_cb self._fetch_id = 0 for m in self.mems: try: @@ -369,8 +377,32 @@ def refresh(self, refresh_done_callback): def _disconnected(self, uri): """The link to the Crazyflie has been broken. Reset state""" + self._call_all_failed_callbacks() self._clear_state() + def _call_all_failed_callbacks(self): + # Read requests + read_requests = list(self._read_requests.values()) + self._read_requests.clear() + for rreq in read_requests: + self.mem_read_failed_cb.call(rreq.mem, rreq.addr, rreq.data) + + # Write requests + write_requests = [] + self._write_requests_lock.acquire() + for requests in self._write_requests.values(): + write_requests += requests + self._write_requests.clear() + self._write_requests_lock.release() + + for wreq in write_requests: + self.mem_write_failed_cb.call(wreq.mem, wreq.addr) + + # Info + if self._refresh_failed_callback: + self._refresh_failed_callback() + self._clear_refresh_callbacks() + def _new_packet_cb(self, packet): """Callback for newly arrived packets for the memory port""" chan = packet.channel @@ -404,7 +436,9 @@ def _handle_cmd_info_nbr(self, payload): pk.data = (CMD_INFO_DETAILS, 0) self.cf.send_packet(pk, expected_reply=(CMD_INFO_DETAILS, 0)) else: - self._refresh_callback() + if self._refresh_callback: + self._refresh_callback() + self._clear_refresh_callbacks() def _handle_cmd_info_details(self, payload): # Did we get a good reply, otherwise try again: @@ -417,7 +451,7 @@ def _handle_cmd_info_details(self, payload): self.nbr_of_mems = 1 if self._refresh_callback: self._refresh_callback() - self._refresh_callback = None + self._clear_refresh_callbacks() return # Create information about a new memory @@ -488,8 +522,7 @@ def _handle_cmd_info_details(self, payload): self.mem_write_cb.add_callback(mem._write_done) self.mem_write_failed_cb.add_callback(mem._write_failed) else: - mem = MemoryElement(id=mem_id, type=mem_type, - size=mem_size, mem_handler=self) + mem = MemoryElement(id=mem_id, type=mem_type, size=mem_size, mem_handler=self) logger.debug(mem) self.mems.append(mem) self.mem_added_cb.call(mem) @@ -512,7 +545,7 @@ def _handle_cmd_info_details(self, payload): if len(ows) == 0: if self._refresh_callback: self._refresh_callback() - self._refresh_callback = None + self._clear_refresh_callbacks() def _handle_chan_write(self, cmd, payload): id = cmd @@ -569,5 +602,4 @@ def _handle_chan_read(self, cmd, payload): else: logger.debug('Status {}: read failed.'.format(status)) self._read_requests.pop(id, None) - self.mem_read_failed_cb.call( - rreq.mem, rreq.addr, rreq.data) + self.mem_read_failed_cb.call(rreq.mem, rreq.addr, rreq.data)