diff --git a/.ruff.toml b/.ruff.toml new file mode 100644 index 0000000..3456a3b --- /dev/null +++ b/.ruff.toml @@ -0,0 +1,5 @@ +line-length = 120 +target-version = "py312" + +[format] +quote-style = "single" \ No newline at end of file diff --git a/.sourcery.yaml b/.sourcery.yaml index 24c239b..c98755f 100644 --- a/.sourcery.yaml +++ b/.sourcery.yaml @@ -9,6 +9,7 @@ rule_settings: disable: - list-comprehension - dict-comprehension + - sum-comprehension - for-append-to-extend - use-next rule_types: diff --git a/CB/Compat.py b/CB/Compat.py index 2407dfb..70714f4 100644 --- a/CB/Compat.py +++ b/CB/Compat.py @@ -26,18 +26,14 @@ def set_normal_term(self): if system != 'Windows': termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term) - def getch(self): # sourcery skip: assign-if-exp - if system == 'Windows': - return msvcrt.getch() - else: - return sys.stdin.read(1) + def getch(self): + return msvcrt.getch() if system == 'Windows' else sys.stdin.read(1) - def kbhit(self): # sourcery skip: remove-unnecessary-else + def kbhit(self): if system == 'Windows': return msvcrt.kbhit() - else: - dr, dw, de = select([sys.stdin], [], [], 0) - return dr != [] + dr, dw, de = select([sys.stdin], [], [], 0) + return dr != [] def pause(headless): diff --git a/CB/Core.py b/CB/Core.py index 629d9fd..7c90ade 100644 --- a/CB/Core.py +++ b/CB/Core.py @@ -45,9 +45,9 @@ def init_master_config(self): try: self.masterConfig = json.load(gzip.open(io.BytesIO( self.http.get('https://cursebreaker.acidweb.dev/config-v2.json.gz').content))) - except Exception: + except (StopIteration, UnicodeDecodeError, json.JSONDecodeError, httpx.RequestError) as e: raise RuntimeError('Failed to fetch the master config file. ' - 'Check your connectivity to Google Cloud.') from None + 'Check your connectivity to Google Cloud.') from e def init_config(self): if os.path.isfile('CurseBreaker.json'): @@ -193,14 +193,14 @@ def check_if_dev_global(self): return addon['Development'] return 0 - def check_if_from_gh(self): # sourcery skip: sum-comprehension + def check_if_from_gh(self): if self.config['GHAPIKey'] != '': return False count = 0 for addon in self.config['Addons']: if addon['URL'].startswith('https://github.com/'): count += 1 - return count > 3 + return count > 4 def cleanup(self, directories): if len(directories) > 0: @@ -301,7 +301,7 @@ def del_addon(self, url, keep): return old['Name'], old['Version'] return False, False - def update_addon(self, url, update, force): # sourcery skip: extract-method + def update_addon(self, url, update, force): if not (old := self.check_if_installed(url)): return url, [], False, False, None, False, False, '?', None, None, None dev = self.check_if_dev(old['URL']) @@ -395,16 +395,6 @@ def block_toggle(self, url): return not state return None - def generic_toggle(self, option, inside=None): - if inside: - self.config[option][inside] = not self.config[option][inside] - self.save_config() - return self.config[option][inside] - else: - self.config[option] = not self.config[option] - self.save_config() - return self.config[option] - def backup_check(self): if not self.config['Backup']['Enabled']: return False diff --git a/CB/GitHub.py b/CB/GitHub.py index bbc00dc..33c4e96 100644 --- a/CB/GitHub.py +++ b/CB/GitHub.py @@ -6,7 +6,6 @@ from . import retry, APIAuth -# noinspection PyTypeChecker class GitHubAddon: @retry() def __init__(self, url, checkcache, packagercache, clienttype, apikey, http): @@ -33,8 +32,7 @@ def __init__(self, url, checkcache, packagercache, clienttype, apikey, http): else: self.payload = self.payload.json() for release in self.payload: - if release['assets'] and len(release['assets']) > 0 \ - and not release['draft'] and not release['prerelease']: + if release['assets'] and len(release['assets']) > 0 and not release['draft'] and not release['prerelease']: self.payloads.append(release) if len(self.payloads) > 14: break @@ -149,11 +147,11 @@ def install(self, path): class GitHubAddonRaw: @retry() def __init__(self, addon, apikey, http): - repository = addon["Repository"] + repository = addon['Repository'] self.http = http self.apiKey = apikey - self.branch = addon["Branch"] - self.name = addon["Name"] + self.branch = addon['Branch'] + self.name = addon['Name'] try: self.payload = self.http.get(f'https://api.github.com/repos/{repository}/branches/{self.branch}', auth=APIAuth('token', self.apiKey)) @@ -178,8 +176,8 @@ def __init__(self, addon, apikey, http): self.currentVersion = self.payload['commit']['sha'][:7] self.uiVersion = None self.archive = None - self.directories = addon["Directories"] - self.author = addon["Authors"] + self.directories = addon['Directories'] + self.author = addon['Authors'] @retry() def get_addon(self): diff --git a/CB/Tukui.py b/CB/Tukui.py index acf98d7..e300c72 100644 --- a/CB/Tukui.py +++ b/CB/Tukui.py @@ -1,6 +1,5 @@ import os import io -import httpx import zipfile from . import retry diff --git a/CurseBreaker.py b/CurseBreaker.py index f39b14c..d04e094 100644 --- a/CurseBreaker.py +++ b/CurseBreaker.py @@ -52,7 +52,7 @@ def __init__(self): self.os = platform.system() install() - def start(self): + def start(self): # sourcery skip: low-code-quality # Check if headless mode was requested if len(sys.argv) == 2 and sys.argv[1].lower() == 'headless': self.headless = True @@ -88,6 +88,7 @@ def start(self): except IOError: self.handle_shutdown('[bold red]CurseBreaker doesn\'t have write rights for the current directory.\nTry sta' 'rting it with administrative privileges.[/bold red]\n') + # Application auto update and initialization self.auto_update() try: self.core.init_config() @@ -167,6 +168,7 @@ def start(self): if self.headless: self.core.http.close() sys.exit(1) + # Interactive mode initialization self.setup_completer() self.print_header() self.console.print('Use command [green]help[/green] or press [green]TAB[/green] to see a list of available comm' @@ -204,7 +206,32 @@ def start(self): else: self.console.print('Command not found.') - def auto_update(self): # sourcery skip: extract-duplicate-method, extract-method + def _auto_update_cleanup(self): + self.print_log() + self.core.http.close() + pause(self.headless) + + def _auto_update_install(self, url, changelog): + self.console.print('[green]Updating CurseBreaker...[/green]') + shutil.move(sys.executable, f'{sys.executable}.old') + payload = self.core.http.get(url) + if self.os == 'Darwin': + zipfile.ZipFile(io.BytesIO(payload.content)).extractall(path=os.path.dirname( + os.path.abspath(sys.executable))) + else: + with open(sys.executable, 'wb') as f: + if self.os == 'Windows': + f.write(payload.content) + elif self.os == 'Linux': + f.write(gzip.decompress(payload.content)) + os.chmod(sys.executable, 0o775) + self.console.print(f'[bold green]Update complete! The application will be restarted now.[/bold green]\n\n' + f'[green]Changelog:[/green]\n{changelog}\n') + self._auto_update_cleanup() + subprocess.call([sys.executable] + sys.argv[1:]) + sys.exit(0) + + def auto_update(self): if not getattr(sys, 'frozen', False) or 'CURSEBREAKER_VARDEXMODE' in os.environ: return try: @@ -226,33 +253,12 @@ def auto_update(self): # sourcery skip: extract-duplicate-method, extract-metho url = binary['browser_download_url'] break if url and Version(remoteversion[1:]) > Version(__version__): - self.console.print('[green]Updating CurseBreaker...[/green]') - shutil.move(sys.executable, f'{sys.executable}.old') - payload = self.core.http.get(url) - if self.os == 'Darwin': - zipfile.ZipFile(io.BytesIO(payload.content)).extractall(path=os.path.dirname( - os.path.abspath(sys.executable))) - else: - with open(sys.executable, 'wb') as f: - if self.os == 'Windows': - f.write(payload.content) - elif self.os == 'Linux': - f.write(gzip.decompress(payload.content)) - os.chmod(sys.executable, 0o775) - self.console.print(f'[bold green]Update complete! The application will be restarted now.' - f'[/bold green]\n\n[green]Changelog:[/green]\n{changelog}\n') - self.print_log() - self.core.http.close() - pause(self.headless) - subprocess.call([sys.executable] + sys.argv[1:]) - sys.exit(0) + self._auto_update_install(url, changelog) except Exception as e: if os.path.isfile(f'{sys.executable}.old'): shutil.move(f'{sys.executable}.old', sys.executable) self.console.print(f'[bold red]Update failed!\n\nReason: {str(e)}[/bold red]\n') - self.print_log() - self.core.http.close() - pause(self.headless) + self._auto_update_cleanup() sys.exit(1) def motd_parser(self): @@ -278,25 +284,20 @@ def handle_exception(self, e, table=True): self.console.print(Traceback.from_exception(exc_type=e.__class__, exc_value=e, traceback=e.__traceback__, width=width)) - # noinspection PyUnboundLocalVariable def handle_keypress(self, wait): - if wait == -1: + if wait == -1 or self.headless: return False - if not self.headless: - kb = KBHit() + kb = KBHit() starttime = time.time() keypress = None while True: - if self.headless: - break - elif kb.kbhit(): + if kb.kbhit(): keypress = kb.getch() break elif wait and time.time() - starttime > wait: break time.sleep(0.01) - if not self.headless: - kb.set_normal_term() + kb.set_normal_term() return keypress def handle_shutdown(self, message=''): @@ -356,11 +357,10 @@ def setup_console(self): def setup_completer(self): if not self.slugs: - # noinspection PyBroadException try: self.slugs = json.load(gzip.open(io.BytesIO( self.core.http.get('https://cursebreaker.acidweb.dev/slugs-v2.json.gz').content))) - except Exception: + except (StopIteration, UnicodeDecodeError, json.JSONDecodeError, httpx.RequestError): self.slugs = {'wa': [], 'wowi': [], 'gh': []} addons = [] for addon in sorted(self.core.config['Addons'], key=lambda k: k['Name'].lower()): @@ -453,43 +453,7 @@ def parse_custom_addons(self): return ' [bold white]|[/bold white] '.join(payload) def c_install(self, args): - if args: - optignore = False - pargs = split(args.replace("'", "\\'")) - if '-i' in pargs: - optignore = True - args = args.replace('-i', '', 1) - args = re.sub(r'([a-zA-Z0-9_:])( +)([a-zA-Z0-9_:])', r'\1,\3', args) - addons = [re.sub(r'[\[\]]', '', addon).strip() for addon in list(reader([args], skipinitialspace=True))[0]] - exceptions = [] - if addons: - if self.core.clientType != 'retail': - for addon in addons: - if addon.startswith('https://www.wowinterface.com/downloads/') or addon.startswith('wowi:'): - self.console.print('[yellow][WARNING][/yellow] WoWInterface support for non-retail clients ' - 'is limited. If the selected project offers multiple downloads this appl' - 'ication will always install the retail version of the addon.') - break - with Progress('{task.completed}/{task.total}', '|', BarColumn(bar_width=None), '|', - auto_refresh=False, console=self.console) as progress: - task = progress.add_task('', total=len(addons)) - while not progress.finished: - for addon in addons: - try: - installed, name, version = self.core.add_addon(addon, optignore) - if installed: - self.table.add_row('[green]Installed[/green]', Text(name, no_wrap=True), - Text(version, no_wrap=True)) - else: - self.table.add_row('[bold black]Already installed[/bold black]', - Text(name, no_wrap=True), Text(version, no_wrap=True)) - except Exception as e: - exceptions.append(e) - progress.update(task, advance=1, refresh=True) - self.console.print(self.table) - if exceptions: - self.handle_exception(exceptions, False) - else: + if not args: self.console.print('[green]Usage:[/green]\n\tThis command accepts a space-separated list of links as an arg' 'ument.[bold white]\n\tFlags:[/bold white]\n\t\t[bold white]-i[/bold white] - Disable th' 'e client version check.\n[bold green]Supported URL:[/bold green]\n\thttps://addons.wago' @@ -498,34 +462,107 @@ def c_install(self, args): '\thttps://github.com/\\[username]/\\[repository_name] [bold white]|[/bold white] gh:\\[' 'username]/\\[repository_name]\n\tElvUI [bold white]|[/bold white] Tukui\n\t' + self.parse_custom_addons(), highlight=False) + return + optignore = False + pargs = split(args.replace("'", "\\'")) + if '-i' in pargs: + optignore = True + args = args.replace('-i', '', 1) + args = re.sub(r'([a-zA-Z0-9_:])( +)([a-zA-Z0-9_:])', r'\1,\3', args) + addons = [re.sub(r'[\[\]]', '', addon).strip() for addon in list(reader([args], skipinitialspace=True))[0]] + exceptions = [] + if addons: + if self.core.clientType != 'retail': + for addon in addons: + if addon.startswith('https://www.wowinterface.com/downloads/') or addon.startswith('wowi:'): + self.console.print('[yellow][WARNING][/yellow] WoWInterface support for non-retail clients is l' + 'imited. If the selected project offers multiple downloads this application ' + 'will always install the retail version of the addon.') + break + with Progress('{task.completed}/{task.total}', '|', BarColumn(bar_width=None), '|', auto_refresh=False, + console=self.console) as progress: + task = progress.add_task('', total=len(addons)) + while not progress.finished: + for addon in addons: + try: + installed, name, version = self.core.add_addon(addon, optignore) + if installed: + self.table.add_row('[green]Installed[/green]', Text(name, no_wrap=True), + Text(version, no_wrap=True)) + else: + self.table.add_row('[bold black]Already installed[/bold black]', + Text(name, no_wrap=True), Text(version, no_wrap=True)) + except Exception as e: + exceptions.append(e) + progress.update(task, advance=1, refresh=True) + self.console.print(self.table) + if exceptions: + self.handle_exception(exceptions, False) def c_uninstall(self, args): - if args: - optkeep = False - pargs = split(args.replace("'", "\\'")) - if '-k' in pargs: - optkeep = True - args = args.replace('-k', '', 1) - addons = self.parse_args(args) - if len(addons) > 0: - with Progress('{task.completed}/{task.total}', '|', BarColumn(bar_width=None), '|', - auto_refresh=False, console=self.console) as progress: - task = progress.add_task('', total=len(addons)) - while not progress.finished: - for addon in addons: - name, version = self.core.del_addon(addon, optkeep) - if name: - self.table.add_row('[bold red]Uninstalled[/bold red]', - Text(name, no_wrap=True), Text(version, no_wrap=True)) - else: - self.table.add_row('[bold black]Not installed[/bold black]', - Text(addon, no_wrap=True), Text('', no_wrap=True)) - progress.update(task, advance=1, refresh=True) - self.console.print(self.table) - else: + if not args: self.console.print('[green]Usage:[/green]\n\tThis command accepts a space-separated list of addon names or ' 'full links as an argument.\n\t[bold white]Flags:[/bold white]\n\t\t[bold white]-k[/bold' ' white] - Keep the addon files after uninstalling.', highlight=False) + return + optkeep = False + pargs = split(args.replace("'", "\\'")) + if '-k' in pargs: + optkeep = True + args = args.replace('-k', '', 1) + addons = self.parse_args(args) + if len(addons) > 0: + with Progress('{task.completed}/{task.total}', '|', BarColumn(bar_width=None), '|', auto_refresh=False, + console=self.console) as progress: + task = progress.add_task('', total=len(addons)) + while not progress.finished: + for addon in addons: + name, version = self.core.del_addon(addon, optkeep) + if name: + self.table.add_row('[bold red]Uninstalled[/bold red]', Text(name, no_wrap=True), + Text(version, no_wrap=True)) + else: + self.table.add_row('[bold black]Not installed[/bold black]', Text(addon, no_wrap=True), + Text('', no_wrap=True)) + progress.update(task, advance=1, refresh=True) + self.console.print(self.table) + + def _c_update_process(self, addon, update, force, compact, compacted, provider): # sourcery skip: low-code-quality + name, authors, versionnew, versionold, uiversion, modified, blocked, source, sourceurl, changelog, dstate \ + = self.core.update_addon(addon if isinstance(addon, str) else addon['URL'], update, force) + if source == 'Unsupported' and not provider: + additionalstatus = f' [bold red]{source.upper()}[/bold red]' + else: + additionalstatus = '' + if versionold: + payload = [self.parse_link(name, sourceurl, authors=authors), + self.parse_link(versionold, changelog, dstate, uiversion=uiversion)] + if versionold == versionnew: + if modified: + payload.insert(0, f'[bold red]Modified[/bold red]{additionalstatus}') + elif compact and compacted > -1 and source != 'Unsupported': + payload = None + compacted += 1 + else: + payload.insert(0, f'[green]Up-to-date[/green]{additionalstatus}') + elif modified or blocked: + payload.insert(0, f'[bold red]Update suppressed[/bold red]{additionalstatus}') + else: + version = self.parse_link(versionnew, changelog, dstate, uiversion=uiversion) + version.stylize('yellow') + payload = [f'[yellow]{"Updated" if update else "Update available"}[/yellow]{additionalstatus}', + payload[0], version] + else: + payload = [f'[bold black]Not installed[/bold black]{additionalstatus}', Text(addon, no_wrap=True), + Text('', no_wrap=True)] + if payload: + if provider: + if source == 'Unsupported': + payload.insert(1, f'[bold red]{source.upper()}[/bold red]') + else: + payload.insert(1, source) + self.table.add_row(*payload) + return compacted def c_update(self, args, addline=False, update=True, force=False, reverseprovider=False, reversecompact=False): compact = not self.core.config['CompactMode'] if reversecompact else self.core.config['CompactMode'] @@ -537,77 +574,38 @@ def c_update(self, args, addline=False, update=True, force=False, reverseprovide else: addons = sorted(self.core.config['Addons'], key=lambda k: k['Name'].lower()) compacted = 0 - exceptions = [] - if len(addons) > 0: - with Progress('{task.completed:.0f}/{task.total}', '|', BarColumn(bar_width=None), '|', - console=None if self.headless else self.console) as progress: - task = progress.add_task('', total=len(addons), start=bool(args)) - if not args: - with suppress(RuntimeError, httpx.RequestError): - self.core.bulk_check(addons) - progress.start_task(task) - self.core.bulk_check_checksum(addons, progress) - while not progress.finished: - for addon in addons: - try: - name, authors, versionnew, versionold, uiversion, modified, blocked, source, sourceurl, \ - changelog, dstate = self.core.update_addon( - addon if isinstance(addon, str) else addon['URL'], update, force) - payload = None - if source == 'Unsupported' and not provider: - additionalstatus = f' [bold red]{source.upper()}[/bold red]' - else: - additionalstatus = '' - if versionold: - if versionold == versionnew: - if modified: - payload = [f'[bold red]Modified[/bold red]{additionalstatus}', - self.parse_link(name, sourceurl, authors=authors), - self.parse_link(versionold, changelog, dstate, uiversion=uiversion)] - elif compact and compacted > -1 and source != 'Unsupported': - compacted += 1 - else: - payload = [f'[green]Up-to-date[/green]{additionalstatus}', - self.parse_link(name, sourceurl, authors=authors), - self.parse_link(versionold, changelog, dstate, uiversion=uiversion)] - elif modified or blocked: - payload = [f'[bold red]Update suppressed[/bold red]{additionalstatus}', - self.parse_link(name, sourceurl, authors=authors), - self.parse_link(versionold, changelog, dstate, uiversion=uiversion)] - else: - version = self.parse_link(versionnew, changelog, dstate, uiversion=uiversion) - version.stylize('yellow') - payload = [f'[yellow]{"Updated" if update else "Update available"}[/yellow]' - f'{additionalstatus}', self.parse_link(name, sourceurl, - authors=authors), version] - else: - payload = [f'[bold black]Not installed[/bold black]{additionalstatus}', - Text(addon, no_wrap=True), Text('', no_wrap=True)] - if payload: - if provider: - if source == 'Unsupported': - payload.insert(1, f'[bold red]{source.upper()}[/bold red]') - else: - payload.insert(1, source) - self.table.add_row(*payload) - except Exception as e: - exceptions.append(e) - progress.update(task, advance=1 if args else 0.5, refresh=True) - if addline: - self.console.print('') - self.console.print(self.table) - if compacted > 0: - self.console.print(f'Additionally [green]{compacted}[/green] addons are up-to-date.') - if overlap := self.core.check_if_overlap(): - self.console.print(f'\n[bold red]Detected addon directory overlap. This will cause issues. Affected add' - f'ons:[/bold red]\n{overlap}') - if self.core.check_if_from_gh(): - self.console.print('\n[bold red]Multiple addons acquired from GitHub have been detected. Providing a p' - 'ersonal GitHub token is highly recommended.[/bold red]') - else: + if len(addons) == 0: self.console.print('Apparently there are no addons installed by CurseBreaker (or you provided incorrect add' 'on name).\nCommand [green]import[/green] might be used to detect already installed addo' 'ns.', highlight=False) + return + exceptions = [] + with Progress('{task.completed:.0f}/{task.total}', '|', BarColumn(bar_width=None), '|', + console=None if self.headless else self.console) as progress: + task = progress.add_task('', total=len(addons), start=bool(args)) + if not args: + with suppress(RuntimeError, httpx.RequestError): + self.core.bulk_check(addons) + progress.start_task(task) + self.core.bulk_check_checksum(addons, progress) + while not progress.finished: + for addon in addons: + try: + compacted = self._c_update_process(addon, update, force, compact, compacted, provider) + except Exception as e: + exceptions.append(e) + progress.update(task, advance=1 if args else 0.5, refresh=True) + if addline: + self.console.print('') + self.console.print(self.table) + if compacted > 0: + self.console.print(f'Additionally [green]{compacted}[/green] addons are up-to-date.') + if overlap := self.core.check_if_overlap(): + self.console.print(f'\n[bold red]Detected addon directory overlap. This will cause issues. Affected add' + f'ons:[/bold red]\n{overlap}') + if self.core.check_if_from_gh(): + self.console.print('\n[bold red]Multiple addons acquired from GitHub have been detected. Providing a p' + 'ersonal GitHub token is highly recommended.[/bold red]') if exceptions: self.handle_exception(exceptions, False) @@ -655,90 +653,73 @@ def c_uri_integration(self, _): else: self.console.print('This feature is available only on Windows.') - def c_toggle(self, args): - if args: - args = args.strip() - if args.startswith('channel'): - if args := args[8:]: - status = self.core.dev_toggle(args) - if status is None: - self.console.print('[bold red]This addon doesn\'t exist or it is not installed yet.[/bold red]') - elif status == -1: - self.console.print('[bold red]This feature can be only used with addons provided by Wago Addons' - '.[/bold red]') - elif status == 0: - self.console.print( - 'All Wago addons are now switched' if args == 'global' else 'Addon switched', - 'to the [yellow]beta[/yellow] channel.') - elif status == 1: - self.console.print( - 'All Wago addons are now switched' if args == 'global' else 'Addon switched', - 'to the [red]alpha[/red] channel.') - elif status == 2: - self.console.print( - 'All Wago addons are now switched' if args == 'global' else 'Addon switched', - 'to the [green]stable[/green] channel.') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts an addon name (or "global") as an' - ' argument.', highlight=False) - elif args.startswith('pinning'): - if args := args[8:]: - status = self.core.block_toggle(args) - if status is None: - self.console.print('[bold red]This addon does not exist or it is not installed yet.[/bold red]') - elif status: - self.console.print('Updates for this addon are now [red]suppressed[/red].') - else: - self.console.print('Updates for this addon are [green]no longer suppressed[/green].') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts an addon name as an argument.') - elif args.startswith('wago'): - if args := args[5:]: - if args == self.core.config['WAUsername']: - self.console.print(f'Wago version check is now: [green]ENABLED[/green]\nEntries created by ' - f'[bold white]{self.core.config["WAUsername"]}[/bold white] are now ' - f'included.') - self.core.config['WAUsername'] = '' - else: - self.core.config['WAUsername'] = args.strip() - self.console.print(f'Wago version check is now: [green]ENABLED[/green]\nEntries created by ' - f'[bold white]{self.core.config["WAUsername"]}[/bold white] are now ' - f'ignored.') - elif self.core.config['WAUsername'] == 'DISABLED': - self.core.config['WAUsername'] = '' - self.console.print('Wago version check is now: [green]ENABLED[/green]') - else: - self.core.config['WAUsername'] = 'DISABLED' - shutil.rmtree(Path('Interface/AddOns/WeakAurasCompanion'), ignore_errors=True) - self.console.print('Wago version check is now: [red]DISABLED[/red]') - self.core.save_config() - elif args.startswith('authors'): - status = self.core.generic_toggle('ShowAuthors') - self.console.print('The authors listing is on now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') - elif args.startswith('autoupdate_delay'): - status = self.core.generic_toggle('AutoUpdateDelay') - self.console.print('The timeout before the automatic addon update on startup is now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') - elif args.startswith('autoupdate'): - status = self.core.generic_toggle('AutoUpdate') - self.console.print('The automatic addon update on startup is now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') - elif args.startswith('backup'): - status = self.core.generic_toggle('Backup', 'Enabled') - self.console.print('Backup of WTF directory is now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') - elif args.startswith('compact_mode'): - status = self.core.generic_toggle('CompactMode') - self.console.print('Table compact mode is now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') - elif args.startswith('sources'): - status = self.core.generic_toggle('ShowSources') - self.console.print('The source column is now:', - '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + def _c_toggle_channel(self, args): + if args := args[8:]: + status = self.core.dev_toggle(args) + if status is None: + self.console.print('[bold red]This addon doesn\'t exist or it is not installed yet.[/bold red]') + elif status == -1: + self.console.print('[bold red]This feature can be only used with addons provided by Wago Addons.[/bold ' + 'red]') + elif status == 0: + self.console.print( + 'All Wago addons are now switched' if args == 'global' else 'Addon switched', + 'to the [yellow]beta[/yellow] channel.') + elif status == 1: + self.console.print( + 'All Wago addons are now switched' if args == 'global' else 'Addon switched', + 'to the [red]alpha[/red] channel.') + elif status == 2: + self.console.print( + 'All Wago addons are now switched' if args == 'global' else 'Addon switched', + 'to the [green]stable[/green] channel.') + else: + self.console.print('[green]Usage:[/green]\n\tThis command accepts an addon name (or "global") as an argumen' + 't.', highlight=False) + + def _c_toggle_pinning(self, args): + if args := args[8:]: + status = self.core.block_toggle(args) + if status is None: + self.console.print('[bold red]This addon does not exist or it is not installed yet.[/bold red]') + elif status: + self.console.print('Updates for this addon are now [red]suppressed[/red].') + else: + self.console.print('Updates for this addon are [green]no longer suppressed[/green].') + else: + self.console.print('[green]Usage:[/green]\n\tThis command accepts an addon name as an argument.') + + def _c_toggle_wago(self, args): + if args := args[5:]: + if args == self.core.config['WAUsername']: + self.console.print(f'Wago version check is now: [green]ENABLED[/green]\nEntries created by [bold white]' + f'{self.core.config["WAUsername"]}[/bold white] are now included.') + self.core.config['WAUsername'] = '' else: - self.console.print('Unknown option.') + self.core.config['WAUsername'] = args.strip() + self.console.print(f'Wago version check is now: [green]ENABLED[/green]\nEntries created by [bold white]' + f'{self.core.config["WAUsername"]}[/bold white] are now ignored.') + elif self.core.config['WAUsername'] == 'DISABLED': + self.core.config['WAUsername'] = '' + self.console.print('Wago version check is now: [green]ENABLED[/green]') + else: + self.core.config['WAUsername'] = 'DISABLED' + shutil.rmtree(Path('Interface/AddOns/WeakAurasCompanion'), ignore_errors=True) + self.console.print('Wago version check is now: [red]DISABLED[/red]') + self.core.save_config() + + def _c_toggle_parse(self, option, inside=None): + if inside: + self.core.config[option][inside] = not self.core.config[option][inside] + self.core.save_config() + return self.core.config[option][inside] else: + self.core.config[option] = not self.core.config[option] + self.core.save_config() + return self.core.config[option] + + def c_toggle(self, args): + if not args: self.console.print('[green]Usage:[/green]\n\t[green]toggle authors[/green]\n\t\tEnables/disables the displa' 'y of addon author names in the table.\n\t[green]toggle autoupdate[/green]\n\t\tEnables/' 'disables the automatic addon update on startup.\n\t[green]toggle autoupdate_delay[/gree' @@ -752,6 +733,40 @@ def c_toggle(self, args): 'sources[/green]\n\t\tEnables/disables the source column in the status table.\n\t[green]' 'toggle wago [Username][/green]\n\t\tEnables/disables automatic Wago updates.\n\t\tIf a ' 'username is provided check will start to ignore the specified author.', highlight=False) + return + args = args.strip() + if args.startswith('channel'): + self._c_toggle_channel(args) + elif args.startswith('pinning'): + self._c_toggle_pinning(args) + elif args.startswith('wago'): + self._c_toggle_wago(args) + elif args == 'authors': + status = self._c_toggle_parse('ShowAuthors') + self.console.print('The authors listing is on now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + elif args == 'autoupdate_delay': + status = self._c_toggle_parse('AutoUpdateDelay') + self.console.print('The timeout before the automatic addon update on startup is now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + elif args == 'autoupdate': + status = self._c_toggle_parse('AutoUpdate') + self.console.print('The automatic addon update on startup is now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + elif args == 'backup': + status = self._c_toggle_parse('Backup', 'Enabled') + self.console.print('Backup of WTF directory is now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + elif args == 'compact_mode': + status = self._c_toggle_parse('CompactMode') + self.console.print('Table compact mode is now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + elif args == 'sources': + status = self._c_toggle_parse('ShowSources') + self.console.print('The source column is now:', + '[green]ENABLED[/green]' if status else '[red]DISABLED[/red]') + else: + self.console.print('Unknown option.') def _c_set_parse(self, msg, key, value): self.console.print(msg) @@ -759,45 +774,7 @@ def _c_set_parse(self, msg, key, value): self.core.save_config() def c_set(self, args): - if args: - args = args.strip() - if args.startswith('wago_addons_api'): - if args := args[16:]: - self._c_set_parse('Wago Addons API key is now set.', 'WAAAPIKey', args) - elif self.core.config['WAAAPIKey'] != '': - self._c_set_parse('Wago Addons API key is now removed.', 'WAAAPIKey', '') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') - elif args.startswith('wago_api'): - if args := args[9:]: - self._c_set_parse('Wago API key is now set.', 'WAAPIKey', args) - elif self.core.config['WAAPIKey'] != '': - self._c_set_parse('Wago API key is now removed.', 'WAAPIKey', '') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') - elif args.startswith('gh_api'): - if args := args[7:]: - self._c_set_parse('GitHub API key is now set.', 'GHAPIKey', args) - elif self.core.config['GHAPIKey'] != '': - self._c_set_parse('GitHub API key is now removed.', 'GHAPIKey', '') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') - elif args.startswith('wago_wow_account'): - if args := args[17:]: - args = args.strip() - if os.path.isfile(Path(f'WTF/Account/{args}/SavedVariables/WeakAuras.lua')) or \ - os.path.isfile(Path(f'WTF/Account/{args}/SavedVariables/Plater.lua')): - self.console.print(f'WoW account name set to: [bold white]{args}[/bold white]') - self.core.config['WAAccountName'] = args - self.core.save_config() - else: - self.console.print('Incorrect WoW account name.') - else: - self.console.print('[green]Usage:[/green]\n\tThis command accepts the WoW account name as an' - ' argument.') - else: - self.console.print('Unknown option.') - else: + if not args: self.console.print('[green]Usage:[/green]\n\t[green]set wago_addons_api [API key][/green]\n\t\tSets Wago Ad' 'dons API key required to use Wago Addons as addon source.\n\t\tIt can be obtained here:' ' [link=https://addons.wago.io/patreon]https://addons.wago.io/patreon[/link]\n\t[green]s' @@ -807,83 +784,123 @@ def c_set(self, args): 'by Wago updater.\n\t\tNeeded only if compatible addons are used on more than one WoW ac' 'count.\n\t[green]set gh_api [API key][/green]\n\t\tSets GitHub API key. Might be needed' ' to get around API rate limits.', highlight=False) - - def c_wago_update(self, _, verbose=True, flush=True): # sourcery skip: extract-duplicate-method - if os.path.isdir(Path('Interface/AddOns/WeakAuras')) or os.path.isdir(Path('Interface/AddOns/Plater')): - accounts = self.core.detect_accounts() - if self.core.config['WAAccountName'] != '' and self.core.config['WAAccountName'] not in accounts: - self.core.config['WAAccountName'] = '' - if len(accounts) == 0: - return - elif len(accounts) > 1 and self.core.config['WAAccountName'] == '': - if verbose: - self.console.print('More than one WoW account detected.\nPlease use [bold white]set wago_wow_accoun' - 't[''/bold white] command to set the correct account name.') + return + args = args.strip() + if args.startswith('wago_addons_api'): + if args := args[16:]: + self._c_set_parse('Wago Addons API key is now set.', 'WAAAPIKey', args) + elif self.core.config['WAAAPIKey'] != '': + self._c_set_parse('Wago Addons API key is now removed.', 'WAAAPIKey', '') + else: + self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') + elif args.startswith('wago_api'): + if args := args[9:]: + self._c_set_parse('Wago API key is now set.', 'WAAPIKey', args) + elif self.core.config['WAAPIKey'] != '': + self._c_set_parse('Wago API key is now removed.', 'WAAPIKey', '') + else: + self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') + elif args.startswith('gh_api'): + if args := args[7:]: + self._c_set_parse('GitHub API key is now set.', 'GHAPIKey', args) + elif self.core.config['GHAPIKey'] != '': + self._c_set_parse('GitHub API key is now removed.', 'GHAPIKey', '') + else: + self.console.print('[green]Usage:[/green]\n\tThis command accepts API key as an argument.') + elif args.startswith('wago_wow_account'): + if args := args[17:]: + args = args.strip() + if os.path.isfile(Path(f'WTF/Account/{args}/SavedVariables/WeakAuras.lua')) or \ + os.path.isfile(Path(f'WTF/Account/{args}/SavedVariables/Plater.lua')): + self.console.print(f'WoW account name set to: [bold white]{args}[/bold white]') + self.core.config['WAAccountName'] = args + self.core.save_config() else: - self.console.print('\n[green]More than one WoW account detected.[/green]\nPlease use [bold white]se' - 't wago_wow_account[/bold white] command to set the correct account name.') - return - elif len(accounts) == 1 and self.core.config['WAAccountName'] == '': - self.core.config['WAAccountName'] = accounts[0] - self.core.save_config() - if flush and len(self.core.config['WAStash']) > 0: - self.core.config['WAStash'] = [] - self.core.save_config() - wago = WagoUpdater(self.core.config, self.core.http) - if Version(__version__) >= Version(self.core.masterConfig['ConfigVersion']) and \ - self.core.masterConfig['CBCompanionVersion'] > self.core.config['CBCompanionVersion']: - self.core.config['CBCompanionVersion'] = self.core.masterConfig['CBCompanionVersion'] - self.core.save_config() - force = True + self.console.print('Incorrect WoW account name.') else: - force = False - wago.install_companion(force) - statuswa, statusplater, statusstash = wago.update() + self.console.print('[green]Usage:[/green]\n\tThis command accepts the WoW account name as an' + ' argument.') + else: + self.console.print('Unknown option.') + + def _c_wago_update_init(self, flush, verbose): + accounts = self.core.detect_accounts() + if self.core.config['WAAccountName'] != '' and self.core.config['WAAccountName'] not in accounts: + self.core.config['WAAccountName'] = '' + if len(accounts) == 0: + return + elif len(accounts) > 1 and self.core.config['WAAccountName'] == '': if verbose: - if len(statusstash) > 0: - self.console.print('[green]WeakAuras ready to install:[/green]') - for aura in statusstash: - self.console.print(aura) - self.console.print('\nReload the interface in the WoW client to access them.') - elif len(statuswa[0]) > 0 or len(statuswa[1]) > 0: - self.console.print('[green]Outdated WeakAuras:[/green]') - for aura in statuswa[0]: - self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) - self.console.print('\n[green]Detected WeakAuras:[/green]') - for aura in statuswa[1]: - self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) - if len(statusplater[0]) > 0 or len(statusplater[1]) > 0: - if len(statuswa[0]) != 0 or len(statuswa[1]) != 0: - self.console.print('') - self.console.print('[green]Outdated Plater profiles/scripts:[/green]') - for aura in statusplater[0]: - self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) - self.console.print('\n[green]Detected Plater profiles/scripts:[/green]') - for aura in statusplater[1]: - self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) + self.console.print('More than one WoW account detected.\nPlease use [bold white]set wago_wow_accoun' + 't[''/bold white] command to set the correct account name.') else: - if not self.headless: - self.console.control(Control.move(x=0, y=-1)) - if len(statuswa[0]) > 0: - self.console.print(f'\n[green]The number of outdated WeakAuras:[/green] ' - f'{len(statuswa[0])}', highlight=False) - if len(statusplater[0]) > 0: - self.console.print(f'\n[green]The number of outdated Plater profiles/scripts:[/green] ' - f'{len(statusplater[0])}', highlight=False) - elif verbose: - self.console.print('No compatible addon is installed.') + self.console.print('\n[green]More than one WoW account detected.[/green]\nPlease use [bold white]se' + 't wago_wow_account[/bold white] command to set the correct account name.') + return + elif len(accounts) == 1 and self.core.config['WAAccountName'] == '': + self.core.config['WAAccountName'] = accounts[0] + self.core.save_config() + if flush and len(self.core.config['WAStash']) > 0: + self.core.config['WAStash'] = [] + self.core.save_config() + + def _c_wago_update_status(self, addon, status): + self.console.print(f'[green]Outdated {addon}:[/green]') + for aura in status[0]: + self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) + self.console.print(f'\n[green]Detected {addon}:[/green]') + for aura in status[1]: + self.console.print(f'[link={aura[1]}]{aura[0]}[/link]', highlight=False) + + def c_wago_update(self, _, verbose=True, flush=True): + if not os.path.isdir(Path('Interface/AddOns/WeakAuras')) and not os.path.isdir(Path('Interface/AddOns/Plater')): + if verbose: + self.console.print('No compatible addon is installed.') + return + self._c_wago_update_init(flush, verbose) + wago = WagoUpdater(self.core.config, self.core.http) + if Version(__version__) >= Version(self.core.masterConfig['ConfigVersion']) and \ + self.core.masterConfig['CBCompanionVersion'] > self.core.config['CBCompanionVersion']: + self.core.config['CBCompanionVersion'] = self.core.masterConfig['CBCompanionVersion'] + self.core.save_config() + force = True + else: + force = False + wago.install_companion(force) + statuswa, statusplater, statusstash = wago.update() + if verbose: + if len(statusstash) > 0: + self.console.print('[green]WeakAuras ready to install:[/green]') + for aura in statusstash: + self.console.print(aura) + self.console.print('\nReload the interface in the WoW client to access them.') + elif len(statuswa[0]) > 0 or len(statuswa[1]) > 0: + self._c_wago_update_status('WeakAuras', statuswa) + if len(statusplater[0]) > 0 or len(statusplater[1]) > 0: + if len(statuswa[0]) != 0 or len(statuswa[1]) != 0: + self.console.print('') + self._c_wago_update_status('Plater', statusplater) + else: + if not self.headless: + self.console.control(Control.move(x=0, y=-1)) + if len(statuswa[0]) > 0: + self.console.print(f'\n[green]The number of outdated WeakAuras:[/green] ' + f'{len(statuswa[0])}', highlight=False) + if len(statusplater[0]) > 0: + self.console.print(f'\n[green]The number of outdated Plater profiles/scripts:[/green] ' + f'{len(statusplater[0])}', highlight=False) def c_search(self, args): - if args: - results = self.core.search(args) - self.console.print('[green]Top results of your search:[/green]') - for url in results: - if self.core.check_if_installed(url): - self.console.print(f'[link={url}]{url}[/link] [yellow][Installed][/yellow]', highlight=False) - else: - self.console.print(f'[link={url}]{url}[/link]', highlight=False) - else: + if not args: self.console.print('[green]Usage:[/green]\n\tThis command accepts a search query as an argument.') + return + results = self.core.search(args) + self.console.print('[green]Top results of your search:[/green]') + for url in results: + if self.core.check_if_installed(url): + self.console.print(f'[link={url}]{url}[/link] [yellow][Installed][/yellow]', highlight=False) + else: + self.console.print(f'[link={url}]{url}[/link]', highlight=False) def c_backup(self, _): self.core.backup_wtf(None if self.headless else self.console) @@ -896,13 +913,13 @@ def c_import(self, args): self.console.print('[green]New addons found:[/green]') for addon in names: self.console.print(addon, highlight=False) - self.console.print(f'\n[yellow]Already installed addons:[/yellow]') + self.console.print('\n[yellow]Already installed addons:[/yellow]') for addon in installed: self.console.print(addon, highlight=False) - self.console.print(f'\n[bold]This process detects only addons available on Wago Addons and ElvUI/Tukui.[/bo' - f'ld]\nExecute [bold white]import install[/bold white] command to install all new detect' - f'ed addons.\nAfter installation run the [bold white]orphans[/bold white] command and [b' - f'old white]install[/bold white] missing addons.') + self.console.print('\n[bold]This process detects only addons available on Wago Addons and ElvUI/Tukui.[/bol' + 'd]\nExecute [bold white]import install[/bold white] command to install all new detected' + ' addons.\nAfter installation run the [bold white]orphans[/bold white] command and [bold' + ' white]install[/bold white] missing addons.') def c_export(self, _): payload = self.core.export_addons() diff --git a/Pipfile b/Pipfile index 5f1b6b9..e939129 100644 --- a/Pipfile +++ b/Pipfile @@ -16,6 +16,7 @@ rich = "*" [dev-packages] sourcery = "*" +ruff = "*" [requires] python_version = "3.12" diff --git a/Pipfile.lock b/Pipfile.lock index b2d95a5..5b5f64b 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "28dc497f947572d0666305896143e403fb5d298f51fe7692f3429b1be3895733" + "sha256": "ff2d5759ec027f3bd763a51dd4f68c744a5c4c94f2def41f1fbc30ff4378050c" }, "pipfile-spec": 6, "requires": { @@ -186,11 +186,11 @@ }, "idna": { "hashes": [ - "sha256:9ecdbbd083b06798ae1e86adcbfe8ab1479cf864e4ee30fe4e46a003d12491ca", - "sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f" + "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc", + "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0" ], "markers": "python_version >= '3.5'", - "version": "==3.6" + "version": "==3.7" }, "markdown": { "hashes": [ @@ -256,7 +256,7 @@ "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427" ], "index": "pypi", - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", "version": "==2.9.0.post0" }, "rich": { @@ -273,7 +273,7 @@ "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'", "version": "==1.16.0" }, "sniffio": { @@ -293,6 +293,30 @@ } }, "develop": { + "ruff": { + "hashes": [ + "sha256:122de171a147c76ada00f76df533b54676f6e321e61bd8656ae54be326c10296", + "sha256:3a05f3793ba25f194f395578579c546ca5d83e0195f992edc32e5907d142bfa3", + "sha256:5e55771559c89272c3ebab23326dc23e7f813e492052391fe7950c1a5a139d89", + "sha256:712e71283fc7d9f95047ed5f793bc019b0b0a29849b14664a60fd66c23b96da1", + "sha256:87258e0d4b04046cf1d6cc1c56fadbf7a880cc3de1f7294938e923234cf9e498", + "sha256:89b1e92b3bd9fca249153a97d23f29bed3992cff414b222fcd361d763fc53f12", + "sha256:9d8605aa990045517c911726d21293ef4baa64f87265896e491a05461cae078d", + "sha256:a067daaeb1dc2baf9b82a32dae67d154d95212080c80435eb052d95da647763d", + "sha256:a532a90b4a18d3f722c124c513ffb5e5eaff0cc4f6d3aa4bda38e691b8600c9f", + "sha256:a759d33a20c72f2dfa54dae6e85e1225b8e302e8ac655773aff22e542a300985", + "sha256:a7b6e63194c68bca8e71f81de30cfa6f58ff70393cf45aab4c20f158227d5936", + "sha256:aef5bd3b89e657007e1be6b16553c8813b221ff6d92c7526b7e0227450981eac", + "sha256:d80a6b18a6c3b6ed25b71b05eba183f37d9bc8b16ace9e3d700997f00b74660b", + "sha256:dabc62195bf54b8a7876add6e789caae0268f34582333cda340497c886111c39", + "sha256:dc56bb16a63c1303bd47563c60482a1512721053d93231cf7e9e1c6954395a0e", + "sha256:dfd3504e881082959b4160ab02f7a205f0fadc0a9619cc481982b6837b2fd4c0", + "sha256:faeeae9905446b975dcf6d4499dc93439b131f1443ee264055c5716dd947af55" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==0.3.5" + }, "sourcery": { "hashes": [ "sha256:3c2d9dcff285a46151365fa1a1121ac9e2e90a9e162a5c41153a977ee71ca12d",