diff --git a/inputremapper/injection/macros/argument.py b/inputremapper/injection/macros/argument.py index e16c2a95..d4cd7ae6 100644 --- a/inputremapper/injection/macros/argument.py +++ b/inputremapper/injection/macros/argument.py @@ -304,9 +304,17 @@ def _is_numeric_string(self, value: str) -> bool: return False def _type_error_factory(self, value: Any) -> MacroError: + formatted_types: List[str] = [] + + for type_ in self.types: + if type_ is None: + formatted_types.append("None") + else: + formatted_types.append(type_.__name__) + return MacroError( msg=( - f'Expected "{self.name}" to be one of {self.types}, but got ' - f"{type(value)} {value}" + f'Expected "{self.name}" to be one of {formatted_types}, but got ' + f'{type(value).__name__} "{value}"' ) ) diff --git a/inputremapper/injection/macros/parse.py b/inputremapper/injection/macros/parse.py index 0e5e5d9c..06835ea9 100644 --- a/inputremapper/injection/macros/parse.py +++ b/inputremapper/injection/macros/parse.py @@ -44,6 +44,7 @@ from inputremapper.injection.macros.tasks.mod_tap import ModTapTask from inputremapper.injection.macros.tasks.modify import ModifyTask from inputremapper.injection.macros.tasks.mouse import MouseTask +from inputremapper.injection.macros.tasks.parallel import ParallelTask from inputremapper.injection.macros.tasks.mouse_xy import MouseXYTask from inputremapper.injection.macros.tasks.repeat import RepeatTask from inputremapper.injection.macros.tasks.set import SetTask @@ -78,6 +79,7 @@ class Parser: "if_single": IfSingleTask, "add": AddTask, "mod_tap": ModTapTask, + "parallel": ParallelTask, # Those are only kept for backwards compatibility with old macros. The space for # writing macro was very constrained in the past, so shorthands were introduced: "m": ModifyTask, @@ -183,7 +185,7 @@ def _split_keyword_arg(param): return None, param @staticmethod - def check_for_unknown_keyword_arguments( + def _validate_keyword_argument_names( keyword_args: Dict[str, Any], task_class: Type[Task], ) -> None: @@ -212,8 +214,14 @@ def _parse_recurse( code Just like parse. A single parameter or the complete macro as string. Comments and redundant whitespace characters are expected to be removed already. - TODO add some examples. - Are all of "foo(1);bar(2)" "foo(1)" and "1" valid inputs? + Example: + - "parallel(key(a),key(b).key($foo))" + - "key(a)" + - "a" + - "key(b).key($foo)" + - "b" + - "key($foo)" + - "$foo" context : Context macro_instance A macro instance to add tasks to. This is the output of the parser, and is @@ -233,121 +241,138 @@ def debug(*args, **kwargs): code = code.strip() # is it another macro? - call_match = re.match(r"^(\w+)\(", code) - call = call_match[1] if call_match else None - if call is not None: - if macro_instance is None: - # start a new chain - macro_instance = Macro(code, context, mapping) + task_call_match = re.match(r"^(\w+)\(", code) + task_name = task_call_match[1] if task_call_match else None + + if task_name is None: + # It is probably either a key name like KEY_A or a variable name as in `set(var,1)`, + # both won't contain special characters that can break macro syntax so they don't + # have to be wrapped in quotes. The argument configuration of the tasks will + # detemrine how to parse it. + debug("%svalue %s", space, code) + return RawValue(value=code) + + if macro_instance is None: + # start a new chain + macro_instance = Macro(code, context, mapping) + else: + # chain this call to the existing instance + assert isinstance(macro_instance, Macro) + + task_class = Parser.TASK_CLASSES.get(task_name) + if task_class is None: + raise MacroError(code, f"Unknown function {task_name}") + + # get all the stuff inbetween + closing_bracket_position = Parser._count_brackets(code) - 1 + inner = code[code.index("(") + 1 : closing_bracket_position] + debug("%scalls %s with %s", space, task_name, inner) + + # split "3, foo=a(2, k(a).w(10))" into arguments + raw_string_args = Parser._extract_args(inner) + + # parse and sort the params + positional_args: List[RawValue] = [] + keyword_args: Dict[str, RawValue] = {} + for param in raw_string_args: + key, value = Parser._split_keyword_arg(param) + parsed = Parser._parse_recurse( + value.strip(), + context, + mapping, + verbose, + None, + depth + 1, + ) + if key is None: + if len(keyword_args) > 0: + msg = f'Positional argument "{key}" follows keyword argument' + raise MacroError(code, msg) + positional_args.append(parsed) else: - # chain this call to the existing instance - assert isinstance(macro_instance, Macro) - - task_class = Parser.TASK_CLASSES.get(call) - if task_class is None: - raise MacroError(code, f"Unknown function {call}") - - # get all the stuff inbetween - closing_bracket_position = Parser._count_brackets(code) - 1 - inner = code[code.index("(") + 1 : closing_bracket_position] - debug("%scalls %s with %s", space, call, inner) - - # split "3, foo=a(2, k(a).w(10))" into arguments - raw_string_args = Parser._extract_args(inner) - - # parse and sort the params - positional_args: List[RawValue] = [] - keyword_args: Dict[str, RawValue] = {} - for param in raw_string_args: - key, value = Parser._split_keyword_arg(param) - parsed = Parser._parse_recurse( - value.strip(), + if key in keyword_args: + raise MacroError(code, f'The "{key}" argument was specified twice') + keyword_args[key] = parsed + + debug( + "%sadd call to %s with %s, %s", + space, + task_name, + positional_args, + keyword_args, + ) + + Parser._validate_keyword_argument_names( + keyword_args, + task_class, + ) + Parser._validate_num_args( + code, + task_name, + task_class, + raw_string_args, + ) + + try: + task = task_class( + positional_args, + keyword_args, + context, + mapping, + ) + macro_instance.add_task(task) + except TypeError as exception: + raise MacroError(msg=str(exception)) from exception + + # is after this another call? Chain it to the macro_instance + more_code_exists = len(code) > closing_bracket_position + 1 + if more_code_exists: + next_char = code[closing_bracket_position + 1] + statement_closed = next_char == "." + + if statement_closed: + # skip over the ")." + chain = code[closing_bracket_position + 2 :] + debug("%sfollowed by %s", space, chain) + Parser._parse_recurse( + chain, context, mapping, verbose, - None, - depth + 1, + macro_instance, + depth, + ) + elif re.match(r"[a-zA-Z_]", next_char): + # something like foo()bar + raise MacroError( + code, + f'Expected a "." to follow after ' + f"{code[:closing_bracket_position + 1]}", ) - if key is None: - if len(keyword_args) > 0: - msg = f'Positional argument "{key}" follows keyword argument' - raise MacroError(code, msg) - positional_args.append(parsed) - else: - if key in keyword_args: - raise MacroError( - code, f'The "{key}" argument was specified twice' - ) - keyword_args[key] = parsed - - Parser.check_for_unknown_keyword_arguments(keyword_args, task_class) - - debug( - "%sadd call to %s with %s, %s", - space, - call, - positional_args, - keyword_args, - ) - - min_args, max_args = task_class.get_num_parameters() - num_provided_args = len(raw_string_args) - if num_provided_args < min_args or num_provided_args > max_args: - if min_args != max_args: - msg = ( - f"{call} takes between {min_args} and {max_args}, " - f"not {num_provided_args} parameters" - ) - else: - msg = f"{call} takes {min_args}, not {num_provided_args} parameters" - raise MacroError(code, msg) + return RawValue(value=macro_instance) - try: - task = task_class( - positional_args, - keyword_args, - context, - mapping, + @staticmethod + def _validate_num_args( + code: str, + task_name: str, + task_class: Type[Task], + raw_string_args: List[str], + ) -> None: + min_args, max_args = task_class.get_num_parameters() + num_provided_args = len(raw_string_args) + if num_provided_args < min_args or num_provided_args > max_args: + if min_args != max_args: + msg = ( + f"{task_name} takes between {min_args} and {max_args}, " + f"not {num_provided_args} parameters" ) - macro_instance.add_task(task) - except TypeError as exception: - raise MacroError(msg=str(exception)) from exception - - # is after this another call? Chain it to the macro_instance - more_code_exists = len(code) > closing_bracket_position + 1 - if more_code_exists: - next_char = code[closing_bracket_position + 1] - statement_closed = next_char == "." - - if statement_closed: - # skip over the ")." - chain = code[closing_bracket_position + 2 :] - debug("%sfollowed by %s", space, chain) - Parser._parse_recurse( - chain, - context, - mapping, - verbose, - macro_instance, - depth, - ) - elif re.match(r"[a-zA-Z_]", next_char): - # something like foo()bar - raise MacroError( - code, - f'Expected a "." to follow after ' - f"{code[:closing_bracket_position + 1]}", - ) - - return RawValue(value=macro_instance) - - # It is probably either a key name like KEY_A or a variable name as in `set(var,1)`, - # both won't contain special characters that can break macro syntax so they don't - # have to be wrapped in quotes. The argument configuration of the tasks will - # detemrine how to parse it. - debug("%svalue %s", space, code) - return RawValue(value=code) + else: + msg = ( + f"{task_name} takes {min_args}, not {num_provided_args} parameters" + ) + + raise MacroError(code, msg) @staticmethod def handle_plus_syntax(macro): diff --git a/inputremapper/injection/macros/tasks/parallel.py b/inputremapper/injection/macros/tasks/parallel.py new file mode 100644 index 00000000..cd20fb2a --- /dev/null +++ b/inputremapper/injection/macros/tasks/parallel.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# input-remapper - GUI for device specific keyboard mappings +# Copyright (C) 2024 sezanzeb +# +# This file is part of input-remapper. +# +# input-remapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# input-remapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with input-remapper. If not, see . + +from __future__ import annotations + +import asyncio +from typing import List + +from inputremapper.injection.macros.argument import ArgumentConfig, ArgumentFlags +from inputremapper.injection.macros.macro import Macro, InjectEventCallback +from inputremapper.injection.macros.task import Task + + +class ParallelTask(Task): + """Run all provided macros in parallel.""" + + argument_configs = [ + ArgumentConfig( + name="*macros", + position=ArgumentFlags.spread, + types=[Macro], + ), + ] + + async def run(self, callback: InjectEventCallback) -> None: + macros: List[Macro] = self.get_argument("*macros").get_values() + coroutines = [macro.run(callback) for macro in macros] + await asyncio.gather(*coroutines) diff --git a/readme/macros.md b/readme/macros.md index 7039d458..c4ec216f 100644 --- a/readme/macros.md +++ b/readme/macros.md @@ -379,6 +379,24 @@ Bear in mind that anti-cheat software might detect macros in games. > if_single(key(KEY_A), key(KEY_B), timeout=1000) > ``` +### parallel + +> Run all provided macros in parallel. +> +> ```ts +> parallel(*macros: Macro) +> ``` +> +> Examples: +> +> ```ts +> parallel( +> mouse(up, 10), +> hold_keys(a), +> wheel(down, 10) +> ) +> ``` + ## Syntax Multiple functions are chained using `.`. diff --git a/tests/unit/test_macros/test_parallel.py b/tests/unit/test_macros/test_parallel.py new file mode 100644 index 00000000..b4e9ec66 --- /dev/null +++ b/tests/unit/test_macros/test_parallel.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# input-remapper - GUI for device specific keyboard mappings +# Copyright (C) 2024 sezanzeb +# +# This file is part of input-remapper. +# +# input-remapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# input-remapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with input-remapper. If not, see . + +import asyncio +import unittest + +from evdev.ecodes import ( + EV_KEY, + KEY_A, + KEY_B, + KEY_C, + KEY_D, +) + +from inputremapper.injection.macros.parse import Parser +from tests.lib.test_setup import test_setup +from tests.unit.test_macros.macro_test_base import DummyMapping, MacroTestBase + + +@test_setup +class TestParallel(MacroTestBase): + async def test_1_child_macro(self): + macro = Parser.parse( + "parallel(key(a))", + self.context, + DummyMapping(), + True, + ) + self.assertEqual(len(macro.tasks[0].child_macros), 1) + await macro.run(self.handler) + self.assertEqual(self.result, [(EV_KEY, KEY_A, 1), (EV_KEY, KEY_A, 0)]) + + async def test_4_child_macros(self): + macro = Parser.parse( + "parallel(key(a), key(b), key(c), key(d))", + self.context, + DummyMapping(), + True, + ) + self.assertEqual(len(macro.tasks[0].child_macros), 4) + await macro.run(self.handler) + self.assertIn((EV_KEY, KEY_A, 0), self.result) + self.assertIn((EV_KEY, KEY_B, 0), self.result) + self.assertIn((EV_KEY, KEY_C, 0), self.result) + self.assertIn((EV_KEY, KEY_D, 0), self.result) + + async def test_one_wait_takes_longer(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = Parser.parse( + "parallel(wait(100), wait(10).key(b)).key(c)", + self.context, + mapping, + True, + ) + + asyncio.ensure_future(macro.run(self.handler)) + await asyncio.sleep(0.06) + # The wait(10).key(b) macro is already done, but KEY_C is not yet injected + self.assertEqual(len(self.result), 2) + self.assertIn((EV_KEY, KEY_B, 1), self.result) + self.assertIn((EV_KEY, KEY_B, 0), self.result) + + # Both need to complete for it to continue to key(c) + await asyncio.sleep(0.06) + self.assertEqual(len(self.result), 4) + self.assertIn((EV_KEY, KEY_C, 1), self.result) + self.assertIn((EV_KEY, KEY_C, 0), self.result) + + async def test_parallel_hold(self): + mapping = DummyMapping() + mapping.macro_key_sleep_ms = 0 + macro = Parser.parse( + "parallel(hold_keys(a), hold_keys(b)).key(c)", + self.context, + mapping, + True, + ) + + macro.press_trigger() + asyncio.ensure_future(macro.run(self.handler)) + await asyncio.sleep(0.05) + self.assertIn((EV_KEY, KEY_A, 1), self.result) + self.assertIn((EV_KEY, KEY_B, 1), self.result) + self.assertEqual(len(self.result), 2) + + macro.release_trigger() + await asyncio.sleep(0.05) + self.assertIn((EV_KEY, KEY_A, 0), self.result) + self.assertIn((EV_KEY, KEY_B, 0), self.result) + self.assertIn((EV_KEY, KEY_C, 1), self.result) + self.assertIn((EV_KEY, KEY_C, 0), self.result) + self.assertEqual(len(self.result), 6) + + +if __name__ == "__main__": + unittest.main()