From be8dce71a0bcddd00b7b599bed3b8f495f7b87e8 Mon Sep 17 00:00:00 2001 From: spacemeowx2 Date: Thu, 23 Apr 2020 23:07:21 +0800 Subject: [PATCH] feat: send amiibo report --- joycontrol/controller_state.py | 5 ++ joycontrol/mcu.py | 154 +++++++++++++++++++++++++++++++++ joycontrol/protocol.py | 135 +++++++++++++++++++++++------ joycontrol/report.py | 14 ++- run_amiibo_cli.py | 104 ++++++++++++++++++++++ setup.py | 2 +- 6 files changed, 382 insertions(+), 32 deletions(-) create mode 100644 joycontrol/mcu.py create mode 100644 run_amiibo_cli.py diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index 6902a22d..aee833ea 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -9,6 +9,7 @@ class ControllerState: def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None): self._protocol = protocol self._controller = controller + self._nfc_content = None self._spi_flash = spi_flash @@ -198,6 +199,10 @@ async def button_push(controller_state, *buttons, sec=0.1): await controller_state.send() +async def set_nfc(controller_state, nfc_content): + controller_state._nfc_content = nfc_content + + class _StickCalibration: def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): self.h_center = h_center diff --git a/joycontrol/mcu.py b/joycontrol/mcu.py new file mode 100644 index 00000000..0f26e93c --- /dev/null +++ b/joycontrol/mcu.py @@ -0,0 +1,154 @@ +from enum import Enum +from crc8 import crc8 + + +class Action(Enum): + NON = 0 + REQUEST_STATUS = 1 + START_TAG_POLLING = 2 + START_TAG_DISCOVERY = 3 + READ_TAG = 4 + READ_TAG_2 = 5 + READ_FINISHED = 6 + + +class McuState(Enum): + NOT_INITIALIZED = 0 + IRC = 1 + NFC = 2 + STAND_BY = 3 + BUSY = 4 + + +def copyarray(dest, offset, src): + for i in range(len(src)): + dest[offset + i] = src[i] + +class Mcu: + def __init__(self): + self._fw_major = [0, 3] + self._fw_minor = [0, 5] + + self._bytes = [0] * 313 + + self._action = Action.NON + self._state = McuState.NOT_INITIALIZED + + self._nfc_content = None + + def get_fw_major(self): + return self._fw_major + + def get_fw_minor(self): + return self._fw_minor + + def set_action(self, v): + self._action = v + + def get_action(self): + return self._action + + def set_state(self, v): + self._state = v + + def get_state(self): + return self._state + + def _get_state_byte(self): + if self.get_state() == McuState.NFC: + return 4 + elif self.get_state() == McuState.BUSY: + return 6 + elif self.get_state() == McuState.NOT_INITIALIZED: + return 1 + elif self.get_state() == McuState.STAND_BY: + return 1 + else: + return 0 + + def update_status(self): + self._bytes[0] = 1 + self._bytes[1] = 0 + self._bytes[2] = 0 + self._bytes[3] = self._fw_major[0] + self._bytes[4] = self._fw_major[1] + self._bytes[5] = self._fw_minor[0] + self._bytes[6] = self._fw_minor[1] + self._bytes[7] = self._get_state_byte() + + def update_nfc_report(self): + self._bytes = [0] * 313 + if self.get_action() == Action.REQUEST_STATUS: + self._bytes[0] = 1 + self._bytes[1] = 0 + self._bytes[2] = 0 + self._bytes[3] = self._fw_major[0] + self._bytes[4] = self._fw_major[1] + self._bytes[5] = self._fw_minor[0] + self._bytes[6] = self._fw_minor[1] + self._bytes[7] = self._get_state_byte() + elif self.get_action() == Action.NON: + self._bytes[0] = 0xff + elif self.get_action() == Action.START_TAG_DISCOVERY: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + self._bytes[5] = 9 + self._bytes[6] = 0x31 + self._bytes[7] = 0 + elif self.get_action() == Action.START_TAG_POLLING: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + if not self._nfc_content is None: + data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07] + copyarray(self._bytes, 5, data) + copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) + copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) + else: + print('nfc content is none') + self._bytes[5] = 9 + self._bytes[6] = 0x31 + self._bytes[7] = 0 + elif self.get_action() == Action.READ_TAG or self.get_action() == Action.READ_TAG_2: + self._bytes[0] = 0x3a + self._bytes[1] = 0 + self._bytes[2] = 7 + if self.get_action() == Action.READ_TAG: + data1 = bytes.fromhex('010001310200000001020007') + copyarray(self._bytes, 3, data1) + copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3]) + copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8]) + data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000') + copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2) + copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245]) + self.set_action(Action.READ_TAG_2) + else: + data = bytes.fromhex('02000927') + copyarray(self._bytes, 3, data) + copyarray(self._bytes, 3 + len(data), self._nfc_content[245:]) + self.set_action(Action.READ_FINISHED) + elif self.get_action() == Action.READ_FINISHED: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + data = bytes.fromhex('0931040000000101020007') + copyarray(self._bytes, 5, data) + copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) + copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) + + crc = crc8() + crc.update(bytes(self._bytes[:-1])) + self._bytes[-1] = ord(crc.digest()) + + def set_nfc(self, nfc_content): + self._nfc_content = nfc_content + + def __bytes__(self): + return bytes(self._bytes) diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index 96062114..97b2537d 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -10,6 +10,8 @@ from joycontrol.memory import FlashMemory from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.transport import NotConnectedError +from joycontrol.mcu import Mcu, McuState, Action +from crc8 import crc8 logger = logging.getLogger(__name__) @@ -39,6 +41,8 @@ def __init__(self, controller: Controller, spi_flash: FlashMemory = None): self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state_sender = None + self._mcu = Mcu() + # None = Just answer to sub commands self._input_report_mode = None @@ -89,6 +93,11 @@ async def write(self, input_report: InputReport): input_report.set_timer(self._input_report_timer) self._input_report_timer = (self._input_report_timer + 1) % 0x100 + if input_report.get_input_report_id() == 0x31: + self._mcu.set_nfc(self._controller_state._nfc_content) + self._mcu.update_nfc_report() + input_report.set_mcu(self._mcu) + await self.transport.write(input_report) self._controller_state.sig_is_send.set() @@ -120,15 +129,14 @@ def error_received(self, exc: Exception) -> None: # TODO? raise NotImplementedError() - async def input_report_mode_0x30(self): + async def input_report_mode_full(self): """ - Continuously sends 0x30 input reports containing the controller state. + Continuously sends full input reports containing the controller state. """ if self.transport.is_reading(): - raise ValueError('Transport must be paused in 0x30 input report mode') + raise ValueError('Transport must be paused in full input report mode') input_report = InputReport() - input_report.set_input_report_id(0x30) input_report.set_vibrator_input() input_report.set_misc() @@ -136,6 +144,7 @@ async def input_report_mode_0x30(self): try: while True: + input_report.set_input_report_id(self._input_report_mode) # TODO: improve timing if self.controller == Controller.PRO_CONTROLLER: # send state at 120Hz @@ -159,6 +168,10 @@ async def input_report_mode_0x30(self): pass elif output_report_id == OutputReportID.SUB_COMMAND: reply_send = await self._reply_to_sub_command(report) + elif output_report_id == OutputReportID.REQUEST_MCU: + reply_send = await self._reply_to_mcu(report) + else: + logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') except ValueError as v_err: logger.warning(f'Report parsing error "{v_err}" - IGNORE') except NotImplementedError as err: @@ -207,6 +220,47 @@ async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) else: logger.warning(f'Output report {output_report_id} not implemented - ignoring') + async def _reply_to_mcu(self, report): + sub_command = report.data[11] + sub_command_data = report.data[12:] + + # logging.info(f'received output report - Request MCU sub command {sub_command}') + + if self._mcu.get_action() == Action.READ_TAG or self._mcu.get_action() == Action.READ_TAG_2 or self._mcu.get_action() == Action.READ_FINISHED: + return + + # Request mcu state + if sub_command == 0x01: + # input_report = InputReport() + # input_report.set_input_report_id(0x21) + # input_report.set_misc() + + # input_report.set_ack(0xA0) + # input_report.reply_to_subcommand_id(0x21) + + self._mcu.set_action(Action.REQUEST_STATUS) + # input_report.set_mcu(self._mcu) + + # await self.write(input_report) + # Send Start tag discovery + elif sub_command == 0x02: + # 0: Cancel all, 4: StartWaitingReceive + if sub_command_data[0] == 0x04: + self._mcu.set_action(Action.START_TAG_DISCOVERY) + # 1: Start polling + elif sub_command_data[0] == 0x01: + self._mcu.set_action(Action.START_TAG_POLLING) + # 2: stop polling + elif sub_command_data[0] == 0x02: + self._mcu.set_action(Action.NON) + elif sub_command_data[0] == 0x06: + self._mcu.set_action(Action.READ_TAG) + else: + logging.info(f'Unknown sub_command_data arg {sub_command_data}') + else: + logging.info(f'Unknown MCU sub command {sub_command}') + + async def _reply_to_sub_command(self, report): # classify sub command try: @@ -317,34 +371,40 @@ async def _command_spi_flash_read(self, sub_command_data): async def _command_set_input_report_mode(self, sub_command_data): if sub_command_data[0] == 0x30: - logger.info('Setting input report mode to 0x30...') + pass + elif sub_command_data[0] == 0x31: + pass + else: + logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') + return - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...') - input_report.set_ack(0x80) - input_report.reply_to_subcommand_id(0x03) + input_report = InputReport() + input_report.set_input_report_id(0x21) + input_report.set_misc() + + input_report.set_ack(0x80) + input_report.reply_to_subcommand_id(0x03) - await self.write(input_report) + await self.write(input_report) - # start sending 0x30 input reports - if self._input_report_mode != 0x30: - self._input_report_mode = 0x30 + # start sending input reports + if self._input_report_mode is None: - self.transport.pause_reading() - new_reader = asyncio.ensure_future(self.input_report_mode_0x30()) + self.transport.pause_reading() + new_reader = asyncio.ensure_future(self.input_report_mode_full()) - # We need to swap the reader in the future because this function was probably called by it - async def set_reader(): - await self.transport.set_reader(new_reader) - self.transport.resume_reading() + # We need to swap the reader in the future because this function was probably called by it + async def set_reader(): + await self.transport.set_reader(new_reader) + self.transport.resume_reading() - asyncio.ensure_future(set_reader()).add_done_callback( - utils.create_error_check_callback() - ) - else: - logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') + asyncio.ensure_future(set_reader()).add_done_callback( + utils.create_error_check_callback() + ) + + self._input_report_mode = sub_command_data[0] async def _command_trigger_buttons_elapsed_time(self, sub_command_data): input_report = InputReport() @@ -392,10 +452,26 @@ async def _command_set_nfc_ir_mcu_config(self, sub_command_data): input_report.set_ack(0xA0) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) - # TODO - data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200] + self._mcu.update_status() + data = list(bytes(self._mcu)[0:34]) + crc = crc8() + crc.update(bytes(data[:-1])) + checksum = crc.digest() + data[-1] = ord(checksum) + for i in range(len(data)): - input_report.data[16 + i] = data[i] + input_report.data[16+i] = data[i] + + # Set MCU mode cmd + if sub_command_data[1] == 0: + if sub_command_data[2] == 0: + self._mcu.set_state(McuState.STAND_BY) + elif sub_command_data[2] == 4: + self._mcu.set_state(McuState.NFC) + else: + logger.info(f"unknown mcu state {sub_command_data[2]}") + else: + logger.info(f"unknown mcu config command {sub_command_data}") await self.write(input_report) @@ -408,10 +484,13 @@ async def _command_set_nfc_ir_mcu_state(self, sub_command_data): # 0x01 = Resume input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) + self._mcu.set_action(Action.NON) + self._mcu.set_state(McuState.STAND_BY) elif sub_command_data[0] == 0x00: # 0x00 = Suspend input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) + self._mcu.set_state(McuState.STAND_BY) else: raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' f'not implemented.') diff --git a/joycontrol/report.py b/joycontrol/report.py index c6db5815..fed830ea 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -10,8 +10,7 @@ class InputReport: """ def __init__(self, data=None): if not data: - # TODO: not enough space for NFC/IR data input report - self.data = [0x00] * 51 + self.data = [0x00] * 364 # all input reports are prepended with 0xA1 self.data[0] = 0xA1 else: @@ -113,6 +112,12 @@ def set_6axis_data(self): for i in range(14, 50): self.data[i] = 0x00 + def set_mcu(self, data): + # write to data + data = bytes(data) + for i in range(len(data)): + self.data[50 + i] = data[i] + def reply_to_subcommand_id(self, _id): if isinstance(_id, SubCommand): self.data[15] = _id.value @@ -195,8 +200,10 @@ def __bytes__(self): return bytes(self.data[:51]) elif _id == 0x30: return bytes(self.data[:14]) + elif _id == 0x31: + return bytes(self.data[:363]) else: - return bytes(self.data) + return bytes(self.data[:51]) class SubCommand(Enum): @@ -215,6 +222,7 @@ class SubCommand(Enum): class OutputReportID(Enum): SUB_COMMAND = 0x01 RUMBLE_ONLY = 0x10 + REQUEST_MCU = 0x11 class OutputReport: diff --git a/run_amiibo_cli.py b/run_amiibo_cli.py new file mode 100644 index 00000000..57a00e08 --- /dev/null +++ b/run_amiibo_cli.py @@ -0,0 +1,104 @@ +import argparse +import asyncio +import logging +import os +from contextlib import contextmanager + +from joycontrol import logging_default as log +from joycontrol.command_line_interface import ControllerCLI +from joycontrol.controller_state import ControllerState, button_push, set_nfc +from joycontrol.protocol import controller_protocol_factory, Controller +from joycontrol.server import create_hid_server + +logger = logging.getLogger(__name__) + + +async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None, amiibo=None): + factory = controller_protocol_factory(controller, spi_flash=spi_flash) + ctl_psm, itr_psm = 17, 19 + transport, protocol = await create_hid_server(factory, + reconnect_bt_addr=reconnect_bt_addr, + ctl_psm=ctl_psm, itr_psm=itr_psm, capture_file=capture_file, device_id=device_id) + + controller_state = protocol.get_controller_state() + if amiibo: + await set_nfc(controller_state, amiibo.read()) + + await controller_state.connect() + + async def amiibo(filename): + with open(filename, "rb") as amiibo_file: + content = amiibo_file.read() + await set_nfc(controller_state, content) + + async def remove_amiibo(): + await controller_state.set_nfc(None) + + cli = ControllerCLI(controller_state) + cli.add_command('amiibo', amiibo) + cli.add_command('remove_amiibo', remove_amiibo) + await cli.run() + + logger.info('Stopping communication...') + await transport.close() + + +if __name__ == '__main__': + # check if root + if not os.geteuid() == 0: + raise PermissionError('Script must be run as root!') + + # setup logging + log.configure() + + parser = argparse.ArgumentParser() + #parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER') + parser.add_argument('-l', '--log') + parser.add_argument('-d', '--device_id') + parser.add_argument('--spi_flash') + parser.add_argument('-r', '--reconnect_bt_addr', type=str, default=None, + help='The Switch console bluetooth address, for reconnecting as an already paired controller') + parser.add_argument('-a', '--amiibo', type=argparse.FileType('rb'), default=None, + help='The amiibo dump file') + args = parser.parse_args() + + """ + if args.controller == 'JOYCON_R': + controller = Controller.JOYCON_R + elif args.controller == 'JOYCON_L': + controller = Controller.JOYCON_L + elif args.controller == 'PRO_CONTROLLER': + controller = Controller.PRO_CONTROLLER + else: + raise ValueError(f'Unknown controller "{args.controller}".') + """ + controller = Controller.PRO_CONTROLLER + + spi_flash = None + if args.spi_flash: + with open(args.spi_flash, 'rb') as spi_flash_file: + spi_flash = spi_flash_file.read() + + # creates file if arg is given + @contextmanager + def get_output(path=None): + """ + Opens file if path is given + """ + if path is not None: + file = open(path, 'wb') + yield file + file.close() + else: + yield None + + with get_output(args.log) as capture_file: + loop = asyncio.get_event_loop() + loop.run_until_complete(_main( + controller, + reconnect_bt_addr=args.reconnect_bt_addr, + capture_file=capture_file, + spi_flash=spi_flash, + device_id=args.device_id, + amiibo=args.amiibo + )) diff --git a/setup.py b/setup.py index 1e0e4c98..bc1abf12 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, zip_safe=False, install_requires=[ - 'hid', 'aioconsole', 'dbus-python' + 'hid', 'aioconsole', 'dbus-python', 'crc8' ] )