Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Phantom Multiple Port Selection #255

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pros/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def resolve_v5_port(port: Optional[str], type: str, quiet: bool = False) -> Tupl
is_joystick = False
if not port:
ports = find_v5_ports(type)
logger(__name__).debug('Ports: {}'.format(';'.join([str(p.__dict__) for p in ports])))
if len(ports) == 0:
if not quiet:
logger(__name__).error('No {0} ports were found! If you think you have a {0} plugged in, '
Expand All @@ -252,8 +253,10 @@ def resolve_v5_port(port: Optional[str], type: str, quiet: bool = False) -> Tupl
return None, False
if len(ports) > 1:
if not quiet:
port = click.prompt('Multiple {} ports were found. Please choose one: '.format('v5'),
port = click.prompt('Multiple {} ports were found. Please choose one: [{}]'
.format('v5', '|'.join([p.device for p in ports])),
default=ports[0].device,
show_default=False,
type=click.Choice([p.device for p in ports]))
assert port in [p.device for p in ports]
else:
Expand Down
38 changes: 26 additions & 12 deletions pros/serial/devices/vex/v5_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import struct
import time
import typing
import platform
from collections import defaultdict
from configparser import ConfigParser
from datetime import datetime, timedelta
Expand Down Expand Up @@ -40,20 +41,28 @@ def filter_v5_ports(p, locations, names):
(p.name is not None and any([n in p.name for n in names])) or \
(p.description is not None and any([n in p.description for n in names]))

def filter_v5_ports_mac(p, device):
return (p.device is not None and p.device.endswith(device))

ports = [p for p in list_all_comports() if filter_vex_ports(p)]

# Initially try filtering based off of location or the name of the device.
# Doesn't work on macOS or Jonathan's Dell, so we have a fallback (below)
user_ports = [p for p in ports if filter_v5_ports(p, ['2'], ['User'])]
system_ports = [p for p in ports if filter_v5_ports(p, ['0'], ['System', 'Communications'])]
joystick_ports = [p for p in ports if filter_v5_ports(p, ['1'], ['Controller'])]

# Testing this code path is hard!
# Special logic for macOS
if platform.system() == 'Darwin':
user_ports = [p for p in ports if filter_v5_ports_mac(p, '3')]
system_ports = [p for p in ports if filter_v5_ports_mac(p, '1')]
joystick_ports = [p for p in ports if filter_v5_ports_mac(p, '2')]
else:
user_ports = [p for p in ports if filter_v5_ports(p, ['2'], ['User'])]
system_ports = [p for p in ports if filter_v5_ports(p, ['0'], ['System', 'Communications'])]
joystick_ports = [p for p in ports if filter_v5_ports(p, ['1'], ['Controller'])]

# Fallback for when a brain port's location is not detected properly
if len(user_ports) != len(system_ports):
if len(user_ports) > len(system_ports):
user_ports = [p for p in user_ports if p not in system_ports]
system_ports = [p for p in ports if p not in user_ports and p not in joystick_ports]
else:
system_ports = [p for p in system_ports if p not in user_ports]
user_ports = [p for p in ports if p not in system_ports and p not in joystick_ports]

if len(user_ports) == len(system_ports) and len(user_ports) > 0:
if p_type.lower() == 'user':
Expand All @@ -74,6 +83,10 @@ def natural_key(chunk: str):
if p_type.lower() == 'user':
return [ports[1]]
elif p_type.lower() == 'system':
# check if ports contain the word Brain in the description and return that port
for port in ports:
if "Brain" in port.description:
return [port]
return [ports[0], *joystick_ports]
else:
raise ValueError(f'Invalid port type specified: {p_type}')
Expand Down Expand Up @@ -215,7 +228,7 @@ def can_compress(self):
def is_wireless(self):
version = self.query_system_version()
return version.product == V5Device.SystemVersion.Product.CONTROLLER and \
V5Device.SystemVersion.ControllerFlags.CONNECTED in version.product_flags
V5Device.SystemVersion.ControllerFlags.CONNECTED in version.product_flags

def generate_cold_hash(self, project: Project, extra: dict):
keys = {k: t.version for k, t in project.templates.items()}
Expand Down Expand Up @@ -835,7 +848,7 @@ def get_system_status(self) -> SystemStatus:
logger(__name__).debug('Sending ext 0x22 command')
version = self.query_system_version()
if (version.product == V5Device.SystemVersion.Product.BRAIN and version.system_version in Spec('<1.0.13')) or \
(version.product == V5Device.SystemVersion.Product.CONTROLLER and version.system_version in Spec('<1.0.0-0.70')):
(version.product == V5Device.SystemVersion.Product.CONTROLLER and version.system_version in Spec('<1.0.0-0.70')):
schema = '<x12B3xBI12x'
else:
schema = '<x12B3xBI12xB3x'
Expand Down Expand Up @@ -916,7 +929,7 @@ def kv_write(self, kv: str, payload: Union[Iterable, bytes, bytearray, str]):
payload = payload[:kv_to_max_bytes.get(kv, 254)] + "\0"
if isinstance(payload, str):
payload = payload.encode(encoding='ascii')
tx_fmt =f'<{len(encoded_kv)}s{len(payload)}s'
tx_fmt = f'<{len(encoded_kv)}s{len(payload)}s'
tx_payload = struct.pack(tx_fmt, encoded_kv, payload)
ret = self._txrx_ext_packet(0x2f, tx_payload, 1, check_length=False, check_ack=True)
logger(__name__).debug('Completed ext 0x2f command')
Expand Down Expand Up @@ -985,7 +998,8 @@ def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: b
if len(msg) < rx_length and check_length:
raise VEXCommError(f'Received length is less than {rx_length} (got {len(msg)}).', msg)
elif len(msg) > rx_length and check_length:
ui.echo(f'WARNING: Recieved length is more than {rx_length} (got {len(msg)}). Consider upgrading the PROS (CLI Version: {get_version()}).')
ui.echo(
f'WARNING: Recieved length is more than {rx_length} (got {len(msg)}). Consider upgrading the PROS (CLI Version: {get_version()}).')
return msg

def _txrx_ext_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearray],
Expand Down