diff --git a/setup.py b/setup.py index 1d60811..4321678 100644 --- a/setup.py +++ b/setup.py @@ -28,8 +28,11 @@ ], install_requires=[ "Click>=7.0", + "packaging", "prompt-toolkit>=2.0.10,<3.0", "pyserial>=3.4", + "pyusb", + "requests", "setuptools", "setuptools_scm", "setuptools_scm_git_archive", diff --git a/src/druid/cli.py b/src/druid/cli.py index 74bb6e2..b620a6d 100644 --- a/src/druid/cli.py +++ b/src/druid/cli.py @@ -5,9 +5,14 @@ import click +import requests +import os +from packaging import version + from druid import __version__ from druid.crow import Crow from druid import repl as druid_repl +from druid import pydfu @click.group(invoke_without_command=True) @click.pass_context @@ -44,6 +49,86 @@ def upload(filename): time.sleep(0.3) click.echo(crow.read(1000000)) +@cli.command(short_help="Update crow firmware") +def firmware(): + """ Update crow firmware + """ + print("Checking for updates...") + git_query = requests.get('https://raw.githubusercontent.com/monome/crow/main/version.txt') + git_data = git_query.text.split() + print(">> git version", git_data[0]) + + with Crow() as crow: + local_version = "none" + try: + crow.connect() + except: + print("No crow found, or might be in bootloader mode already...") + local_version = "0" + + # crow found: clear script and read version + if local_version != "0": + crow.write("crow.reset()") + time.sleep(0.1) + c = crow.read(1000000) + crow.write("^^v") + tmp = (crow.read(100)).split("'") + local_version = tmp[1][1:] + + print(">> local version: ", local_version) + + if version.parse(local_version) >= version.parse(git_data[0]): + print("Up to date.") + exit() + + # delete old crow.dfu if exists + if os.path.exists("crow.dfu"): + os.remove("crow.dfu") + + print("Downloading new version:", git_data[1]) + res = requests.get(git_data[1]) + with open('crow.dfu', 'wb') as fwfile: + fwfile.write(res.content) + + if local_version != "0": + crow.write('^^b') + time.sleep(1.0) + print("Crow bootloader enabled.") + + try: + pydfu.init() + except ValueError: + print("Error: pydfu didn't find crow!") + exit() + + elements = pydfu.read_dfu_file("crow.dfu") + if not elements: + return + print("Writing memory...") + pydfu.write_elements(elements, False, progress=pydfu.cli_progress) + + print("Exiting DFU...") + pydfu.exit_dfu() + + os.remove("crow.dfu") + print("Update complete.") + + +@cli.command(short_help="Clear userscript") +def clearscript(): + """ Clear userscript from crow' flash memory """ + try: + pydfu.init() + except ValueError: + print("Error: pydfu didn't find crow! Check you've forced the bootloader.") + exit() + print("Clearing userscript...") + # note we must write a single byte of 0x00 but are primarily just triggering erase of the flash page + pydfu.write_elements([{"addr":0x08010000, "size": 1, "data": [0]}], False, progress=pydfu.cli_progress) + print("Complete. Exiting DFU...") + pydfu.exit_dfu() + + @cli.command() @click.argument("filename", type=click.Path(exists=True), required=False) def repl(filename): diff --git a/src/druid/crow.py b/src/druid/crow.py index b4ad000..10d09db 100644 --- a/src/druid/crow.py +++ b/src/druid/crow.py @@ -10,11 +10,11 @@ logger = logging.getLogger(__name__) -def find_serial_port(hwid): +def find_serial_port(): for portinfo in serial.tools.list_ports.comports(): - if hwid in portinfo.hwid: + if "crow" in portinfo.product: return portinfo - raise DeviceNotFoundError(f"can't find device {hwid}") + raise DeviceNotFoundError(f"can't find crow device") class Crow: def __init__(self, serial=None): @@ -23,7 +23,7 @@ def __init__(self, serial=None): self.event_handlers = {} def find_device(self): - portinfo = find_serial_port('USB VID:PID=0483:5740') + portinfo = find_serial_port() try: return serial.Serial( portinfo.device, @@ -62,7 +62,7 @@ def raise_event(self, event, *args, **kwargs): def replace_handlers(self, handlers): self.event_handlers = handlers - + def reconnect(self, err_event=False): try: self.connect() diff --git a/src/druid/pydfu.py b/src/druid/pydfu.py new file mode 100644 index 0000000..559cd6e --- /dev/null +++ b/src/druid/pydfu.py @@ -0,0 +1,587 @@ +#!/usr/bin/env python2 +# This file is part of the OpenMV project. +# +# https://github.com/openmv/openmv +# /tools/pydfu.py +# +# Copyright (c) 2013-2021 Ibrahim Abdelkader +# Copyright (c) 2013-2021 Kwabena W. Agyeman +# +# This work is licensed under the MIT license, see the file LICENSE for details. +# +# This module implements the DFU protocol for STM32 chips. +# See app note AN3156 for a description of the DFU protocol. +# See document UM0391 for a dscription of the DFuse file. + +from __future__ import print_function + +import argparse +import re +import struct +import sys +import usb.core +import usb.util +import zlib +import os +import time + +# VID/PID +__VID = 0x0483 +__PID = 0xdf11 + +# USB request __TIMEOUT +__TIMEOUT = 5000 + +# DFU commands +__DFU_DETACH = 0 +__DFU_DNLOAD = 1 +__DFU_UPLOAD = 2 +__DFU_GETSTATUS = 3 +__DFU_CLRSTATUS = 4 +__DFU_GETSTATE = 5 +__DFU_ABORT = 6 + +# DFU status +__DFU_STATE_APP_IDLE = 0x00 +__DFU_STATE_APP_DETACH = 0x01 +__DFU_STATE_DFU_IDLE = 0x02 +__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03 +__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04 +__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05 +__DFU_STATE_DFU_MANIFEST_SYNC = 0x06 +__DFU_STATE_DFU_MANIFEST = 0x07 +__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08 +__DFU_STATE_DFU_UPLOAD_IDLE = 0x09 +__DFU_STATE_DFU_ERROR = 0x0a + +__DFU_STATUS = [ + "DFU_STATE_APP_IDLE", + "DFU_STATE_APP_DETACH", + "DFU_STATE_DFU_IDLE", + "DFU_STATE_DFU_DOWNLOAD_SYNC", + "DFU_STATE_DFU_DOWNLOAD_BUSY", + "DFU_STATE_DFU_DOWNLOAD_IDLE", + "DFU_STATE_DFU_MANIFEST_SYNC", + "DFU_STATE_DFU_MANIFEST", + "DFU_STATE_DFU_MANIFEST_WAIT_RESET", + "DFU_STATE_DFU_UPLOAD_IDLE", + "DFU_STATE_DFU_ERROR" +] + +_DFU_DESCRIPTOR_TYPE = 0x21 + +# USB device handle +__dev = None + +__verbose = None + +# USB DFU interface +__DFU_INTERFACE = 0 + +import inspect +if 'length' in inspect.getargspec(usb.util.get_string).args: + # PyUSB 1.0.0.b1 has the length argument + def get_string(dev, index): + return usb.util.get_string(dev, 255, index) +else: + # PyUSB 1.0.0.b2 dropped the length argument + def get_string(dev, index): + return usb.util.get_string(dev, index) + + +def init(): + """Initializes the found DFU device so that we can program it.""" + global __dev + devices = get_dfu_devices(idVendor=__VID, idProduct=__PID) + if not devices: + raise ValueError('No DFU device found') + if len(devices) > 1: + raise ValueError("Multiple DFU devices found") + __dev = devices[0] + + # Claim DFU interface + usb.util.claim_interface(__dev, __DFU_INTERFACE) + + # Clear status + clr_status() + +def clr_status(): + """Clears any error status (perhaps left over from a previous session).""" + while (get_status() != __DFU_STATE_DFU_IDLE): + __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT) + time.sleep(0.100) + + +def get_status(): + """Get the status of the last operation.""" + stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000) + #print ("DFU Status: ", __DFU_STATUS[stat[4]]) + return stat[4] + + +def mass_erase(): + """Performs a MASS erase (i.e. erases the entire device.""" + # Send DNLOAD with first byte=0x41 + __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, + "\x41", __TIMEOUT) + + # Execute last command + if get_status() != __DFU_STATE_DFU_DOWNLOAD_BUSY: + raise Exception("DFU: erase failed") + + # Check command state + if get_status() != __DFU_STATE_DFU_DOWNLOAD_IDLE: + raise Exception("DFU: erase failed") + + +def page_erase(addr): + """Erases a single page.""" + if __verbose: + print("Erasing page: 0x%x..." % (addr)) + + # Send DNLOAD with first byte=0x41 and page address + buf = struct.pack(" 0: + write_size = size + if not mass_erase_used: + for segment in mem_layout: + if addr >= segment['addr'] and \ + addr <= segment['last_addr']: + # We found the page containing the address we want to + # write, erase it + page_size = segment['page_size'] + page_addr = addr & ~(page_size - 1) + if addr + write_size > page_addr + page_size: + write_size = page_addr + page_size - addr + page_erase(page_addr) + break + write_memory(addr, data[:write_size], progress, + elem_addr, elem_size) + data = data[write_size:] + addr += write_size + size -= write_size + if progress: + progress(elem_addr, addr - elem_addr, elem_size) + +def write_bin(path, progress=None): + try: + with open(path, 'rb') as f: + buf = f.read() + except Exception as e: + print(e) + return + + print("file opened") + + xfer_bytes = 0 + xfer_total = len(buf) + + while xfer_bytes < xfer_total: + # Send chunk + chunk = min (64, xfer_total-xfer_bytes) + write_page(buf[xfer_bytes:xfer_bytes+chunk], xfer_bytes) + xfer_bytes += chunk + if (progress): + progress(0x08000000+xfer_bytes, xfer_bytes, xfer_total) + +def cli_progress(addr, offset, size): + """Prints a progress report suitable for use on the command line.""" + width = 25 + done = offset * width // size + print("\r0x{:08x} {:7d} [{}{}] {:3d}% " + .format(addr, size, '=' * done, ' ' * (width - done), + offset * 100 // size), end="") + sys.stdout.flush() + if offset == size: + print("") + + +def main(): + """Test program for verifying this files functionality.""" + global __verbose + # Parse CMD args + parser = argparse.ArgumentParser(description='DFU Python Util') + #parser.add_argument("path", help="file path") + parser.add_argument( + "-l", "--list", + help="list available DFU devices", + action="store_true", + default=False + ) + parser.add_argument( + "-m", "--mass-erase", + help="mass erase device", + action="store_true", + default=False + ) + parser.add_argument( + "-u", "--upload", + help="read file from DFU device", + dest="path", + default=False + ) + parser.add_argument( + "-v", "--verbose", + help="increase output verbosity", + action="store_true", + default=False + ) + args = parser.parse_args() + + __verbose = args.verbose + + if args.list: + list_dfu_devices(idVendor=__VID, idProduct=__PID) + return + + init() + + if args.mass_erase: + print ("Mass erase...") + mass_erase() + + if args.path: + ext = os.path.splitext(args.path)[1] + if ext == ".bin": + print("Writing binary...") + write_bin(args.path, progress=cli_progress) + + print("Exiting DFU...") + exit_dfu() + elif (ext == '.dfu'): + elements = read_dfu_file(args.path) + if not elements: + return + print("Writing memory...") + write_elements(elements, args.mass_erase, progress=cli_progress) + + print("Exiting DFU...") + exit_dfu() + else: + print("File format not supported!") + + return + + print("No command specified") + +if __name__ == '__main__': + main() diff --git a/src/druid/repl.py b/src/druid/repl.py index aa8824e..40db5fa 100644 --- a/src/druid/repl.py +++ b/src/druid/repl.py @@ -214,6 +214,10 @@ def build_ui(self): self.captures = [ TextArea(style='class:capture-field', height=2), TextArea(style='class:capture-field', height=2), + TextArea(style='class:capture-field', height=2), + TextArea(style='class:capture-field', height=2), + TextArea(style='class:capture-field', height=2), + TextArea(style='class:capture-field', height=2), ] self.output_field = TextArea( style='class:output-field', @@ -297,8 +301,23 @@ def crow_event(self, line, event, args): ch = int(ch_str) if ch >= 1 and ch <= 2: self.output_to_field(self.captures[ch - 1], f'\ninput[{ch}] = {val}\n') + elif event == 'pubview': + io, ch_str, val = args + ch = int(ch_str) + if io == "'input'": + if ch >= 1 and ch <= 2: + self.output_to_field(self.captures[ch - 1], f'\nin[{ch}] = {val}\n') + elif io == "'output'": + if ch >= 1 and ch <= 4: + self.output_to_field(self.captures[ch + 1], f'\nout[{ch}] = {val}\n') + else: + self.output_to_field(self.captures[0], f'\nERR:{io}[{ch}] = {val}\n') + elif event == 'ready' or event == 'pupdate' or event == 'pub': + args = args # NOP + # ignore for now + # TODO ^^ready triggers public.discover() and capture public vars into a modifiable form else: - self.output(f'^^{event}({", ".join(args)})') + self.output(f'^^{event}({", ".join(args)})\n') # these come from: # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/5c3d13eb849885bc4c1a2553ea6f81e6272f84c9/prompt_toolkit/key_binding/bindings/scroll.py#L147