From 6890924faf002c01b46db4556a0e137c076ca688 Mon Sep 17 00:00:00 2001 From: RepoDynamicsBot <80158628+AAriam@users.noreply.github.com> Date: Thu, 20 Jun 2024 19:39:11 +0200 Subject: [PATCH] new version --- pyproject.toml | 8 +- src/gittidy/__init__.py | 1 + src/gittidy/exception.py | 16 ++ src/gittidy/git.py | 449 +++++++++++++++++++++++---------------- 4 files changed, 286 insertions(+), 188 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b05ed88..0a50073 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,10 @@ namespaces = true # ----------------------------------------- Project Metadata ------------------------------------- # [project] -version = "0.0.0.dev1" +version = "0.0.0.dev2" name = "GitTidy" -requires-python = ">=3.9" +requires-python = ">=3.10" +dependencies = [ + "LoggerMan", + "PyShellMan" +] diff --git a/src/gittidy/__init__.py b/src/gittidy/__init__.py index e69de29..5f2488b 100644 --- a/src/gittidy/__init__.py +++ b/src/gittidy/__init__.py @@ -0,0 +1 @@ +from gittidy.git import Git diff --git a/src/gittidy/exception.py b/src/gittidy/exception.py index f11f681..c1a0e7c 100644 --- a/src/gittidy/exception.py +++ b/src/gittidy/exception.py @@ -28,3 +28,19 @@ def __init__(self, path: str | _Path): self.path = _Path(path).resolve() super().__init__(f"No Git repository found in '{self.path}' or any parent directory.") return + + +class GitTidyInputError(GitTidyError): + """Error in the input arguments provided to Git methods.""" + + def __init__(self, message: str): + super().__init__(message) + return + + +class GitTidyOperationError(GitTidyError): + """Error in the execution of an operation.""" + + def __init__(self, message: str): + super().__init__(message) + return diff --git a/src/gittidy/git.py b/src/gittidy/git.py index 8656633..a4b9118 100644 --- a/src/gittidy/git.py +++ b/src/gittidy/git.py @@ -1,25 +1,31 @@ -import json -from typing import Literal, Optional -from pathlib import Path -import re -from contextlib import contextmanager +"""Git API.""" + + +from typing import Literal as _Literal +from pathlib import Path as _Path +import re as _re +from contextlib import contextmanager as _contextmanager + from loggerman import logger import pyshellman as _pyshellman -from repodynamics.version import PEP440SemVer from gittidy import exception as _exception class Git: - _COMMITTER_USERNAME = "RepoDynamicsBot" - _COMMITTER_EMAIL = "146771514+RepoDynamicsBot@users.noreply.github.com" @logger.sectioner("Initialize Git API") def __init__( self, - path: str | Path = ".", + path: str | _Path, user: tuple[str, str] | None = None, - user_scope: Literal["system", "global", "local", "worktree"] = "global", + user_scope: _Literal["system", "global", "local", "worktree"] = "global", + author: tuple[str, str] | None = None, + author_scope: _Literal["system", "global", "local", "worktree"] = "local", + author_persistent: bool = False, + committer: tuple[str, str] | None = None, + committer_scope: _Literal["system", "global", "local", "worktree"] = "local", + committer_persistent: bool = False, ): try: git_version = _pyshellman.run(["git", "version", "--build-options"]) @@ -29,39 +35,95 @@ def __init__( try: repo_root_path = _pyshellman.run( - ["git", "-C", str(Path(path).resolve()), "rev-parse", "--show-toplevel"] + ["git", "-C", str(_Path(path).resolve()), "rev-parse", "--show-toplevel"] ) except _pyshellman.exception.PyShellManError: raise _exception.GitTidyNoGitRepositoryError(path) - self._path = Path(repo_root_path.output) + self._path = _Path(repo_root_path.output) logger.info(code_title="Git repository path", code=self._path) if user: self.set_user(username=user[0], email=user[1], scope=user_scope) + + if author: + self._author_username, self._author_email = author + self._author_scope = author_scope + self._author_persistent = author_persistent + if author_persistent: + self.set_user( + username=self._author_username, + email=self._author_email, + user_type="author", + scope=self._author_scope + ) + else: + self._original_author_username, self._original_author_email = self.get_user( + user_type="author", scope=author_scope + ) + else: + self._author_persistent = True + + if committer: + self._committer_username, self._committer_email = committer + self._committer_scope = committer_scope + self._committer_persistent = committer_persistent + if committer_persistent: + self.set_user( + username=committer[0], email=committer[1], user_type="committer", scope=committer_scope + ) + else: + self._original_committer_username, self._original_committer_email = self.get_user( + user_type="committer", scope=committer_scope + ) + else: + self._committer_persistent = True return + @property + def repo_path(self) -> _Path: + return self._path + + def run_command( + self, + command: list[str], + needs_credentials: bool = False, + raise_exit_code: bool = True, + raise_stderr: bool = False, + text_output: bool = True, + ) -> _pyshellman.ShellOutput: + if not needs_credentials: + return self._run_command(command, raise_exit_code, raise_stderr, text_output) + with self._temporary_credentials(): + return self._run_command(command, raise_exit_code, raise_stderr, text_output) + + @logger.sectioner("Git: Push") def push( - self, target: str = None, ref: str = None, set_upstream: bool = False, force_with_lease: bool = False - ) -> str | None: - command = ["git", "push"] + self, + target: str = "", + ref: str = "", + set_upstream: bool = False, + upstream_branch_name: str = "", + force_with_lease: bool = False + ) -> None: + command = ["push"] if set_upstream: if not target: - self._logger.error("No target provided while setting upstream.") - command.extend(["--set-upstream", target, self.current_branch_name()]) + raise _exception.GitTidyInputError("No 'target' provided while 'set_upstream' is set.") + command.extend(["--set-upstream", target, upstream_branch_name or self.current_branch_name()]) elif target: command.append(target) if ref: command.append(ref) if force_with_lease: command.append("--force-with-lease") - with self._temp_committer(): - self.run_command(command) - return self.commit_hash_normal() + self.run_command(command=command, needs_credentials=True) + return + @logger.sectioner("Git: Commit") def commit( self, message: str = "", - stage: Literal["all", "tracked", "none"] = "all", + stage: _Literal["all", "tracked", "none"] = "all", amend: bool = False, allow_empty: bool = False, ) -> str | None: @@ -75,8 +137,10 @@ def commit( - add (bool): Whether to add all changes before committing. """ if not amend and not message: - self._logger.error("No commit message provided.") - commit_cmd = ["git", "commit"] + raise _exception.GitTidyInputError("No 'message' provided for new commit.") + if stage != "none": + self.run_command(["add", "-A" if stage == "all" else "-u"], needs_credentials=True) + commit_cmd = ["commit"] if amend: commit_cmd.append("--amend") if not message: @@ -86,43 +150,34 @@ def commit( for msg_line in message.splitlines(): if msg_line: commit_cmd.extend(["-m", msg_line]) - - if stage != "none": - flag = "-A" if stage == "all" else "-u" - with self._temp_committer(): - self.run_command(["git", "add", flag]) commit_hash = None if allow_empty or self.has_changes(check_type="staged"): - with self._temp_committer(): - out, err, code = self.run_command(commit_cmd, raise_exit_code=False) - if code != 0: - with self._temp_committer(): - self.run_command(commit_cmd) + self.run_command(commit_cmd, needs_credentials=True) commit_hash = self.commit_hash_normal() - self._logger.success(f"Committed changes. Commit hash: {commit_hash}") else: - self._logger.attention(f"No changes to commit.") + logger.info(f"No changes to commit.") return commit_hash + @logger.sectioner("Git: Create Tag") def create_tag( self, tag: str, - message: str = None, + message: str = "", push_target: str = "origin", ): - cmd = ["git", "tag"] + cmd = ["tag"] if not message: cmd.append(tag) else: cmd.extend(["-a", tag, "-m", message]) - with self._temp_committer(): - self.run_command(cmd) - out = self.run_command(["git", "show", tag]) + self.run_command(cmd, needs_credentials=True) + out = self.run_command(["show", tag]) if push_target: self.push(target=push_target, ref=tag) return out - def has_changes(self, check_type: Literal["staged", "unstaged", "all"] = "all") -> bool: + @logger.sectioner("Git: Check Changes") + def has_changes(self, check_type: _Literal["staged", "unstaged", "all"] = "all") -> bool: """Checks for git changes. Parameters: @@ -131,11 +186,12 @@ def has_changes(self, check_type: Literal["staged", "unstaged", "all"] = "all") Returns: - bool: True if changes are detected, False otherwise. """ - commands = {"staged": ["git", "diff", "--quiet", "--cached"], "unstaged": ["git", "diff", "--quiet"]} + commands = {"staged": ["diff", "--quiet", "--cached"], "unstaged": ["diff", "--quiet"]} if check_type == "all": - return any(self.run_command(cmd, raise_exit_code=False)[2] != 0 for cmd in commands.values()) - return self.run_command(commands[check_type], raise_exit_code=False)[2] != 0 + return any(self.run_command(cmd, raise_exit_code=False).code != 0 for cmd in commands.values()) + return self.run_command(commands[check_type], raise_exit_code=False).code != 0 + @logger.sectioner("Git: Get Changed Files") def changed_files(self, ref_start: str, ref_end: str) -> dict[str, list[str]]: """ Get all files that have changed between two commits, and the type of changes. @@ -185,7 +241,7 @@ def changed_files(self, ref_start: str, ref_end: str) -> dict[str, list[str]]: "R": "renamed", } out = {} - changes = self.run_command(["git", "diff", "--name-status", ref_start, ref_end]).splitlines() + changes = self.run_command(["diff", "--name-status", ref_start, ref_end]).output.splitlines() for change in changes: key, *paths = change.split("\t") if key in key_def: @@ -193,7 +249,7 @@ def changed_files(self, ref_start: str, ref_end: str) -> dict[str, list[str]]: continue key, similarity = key[0], int(key[1:]) if key not in ["C", "R"]: - self._logger.error(f"Unknown change type '{change}'.") + logger.error("Unknown file change type", change) out_key = key_def[key] if similarity != 100: out_key += "_modified" @@ -201,6 +257,7 @@ def changed_files(self, ref_start: str, ref_end: str) -> dict[str, list[str]]: out.setdefault(f"{out_key}_to", []).append(paths[1]) return out + @logger.sectioner("Git: Get Commit Hash") def commit_hash_normal(self, parent: int = 0) -> str | None: """ Get the commit hash of the current commit. @@ -211,32 +268,34 @@ def commit_hash_normal(self, parent: int = 0) -> str | None: Returns: - str: The commit hash. """ - return self.run_command(["git", "rev-parse", f"HEAD~{parent}"]) + return self.run_command(["rev-parse", f"HEAD~{parent}"]).output + @logger.sectioner("Git: Describe") def describe( self, abbrev: int | None = None, first_parent: bool = True, match: str | None = None ) -> str | None: - cmd = ["git", "describe"] + cmd = ["describe"] if abbrev is not None: cmd.append(f"--abbrev={abbrev}") if first_parent: cmd.append("--first-parent") if match: cmd.extend(["--match", match]) - out, err, code = self.run_command(command=cmd, raise_exit_code=False) - return out if code == 0 else None + result = self.run_command(command=cmd, raise_exit_code=False) + return result.output if result.code == 0 else None + @logger.sectioner("Git: Log") def log( self, number: int | None = None, simplify_by_decoration: bool = True, tags: bool | str = True, - pretty: str | None = "format:%D", - date: str | None = None, - revision_range: str | None = None, - paths: str | list[str] | None = None, - ): - cmd = ["git", "log"] + pretty: str = "format:%D", + date: str = "", + revision_range: str = "", + paths: str | list[str] = "", + ) -> str: + cmd = ["log"] if number: cmd.append(f"-{number}") if simplify_by_decoration: @@ -251,60 +310,64 @@ def log( cmd.append(revision_range) if paths: cmd.extend(["--"] + (paths if isinstance(paths, list) else [paths])) - return self.run_command(cmd) + return self.run_command(cmd).output + @logger.sectioner("Git: Set User") def set_user( self, - username: str | None, - email: str | None, - user_type: Literal["user", "author", "committer"] = "user", - scope: Literal["system", "global", "local", "worktree"] | None = "global", - ): + username: str | None = "", + email: str | None = "", + user_type: _Literal["user", "author", "committer"] = "user", + scope: _Literal["system", "global", "local", "worktree"] | None = "global", + ) -> None: """ Set the git username and email. """ - cmd = ["git", "config"] + cmd = ["config"] if scope: cmd.append(f"--{scope}") if not ((username is None or isinstance(username, str)) and (email is None or isinstance(email, str))): - raise ValueError("username and email must be either a string or None.") + raise _exception.GitTidyInputError("'username' and 'email' must be either a string or None.") for key, val in [("name", username), ("email", email)]: if val is None: self.run_command([*cmd, "--unset", f"{user_type}.{key}"]) - else: + elif val: self.run_command([*cmd, f"{user_type}.{key}", val]) return + @logger.sectioner("Git: Get User") def get_user( self, - user_type: Literal["user", "author", "committer"] = "user", - scope: Optional[Literal["system", "global", "local", "worktree"]] = None, + user_type: _Literal["user", "author", "committer"] = "user", + scope: _Literal["system", "global", "local", "worktree"] | None = None, ) -> tuple[str | None, str | None]: """ Get the git username and email. """ - cmd = ["git", "config"] + cmd = ["config"] if scope: cmd.append(f"--{scope}") user = [] for key in ["name", "email"]: - out, err, code = self.run_command([*cmd, f"{user_type}.{key}"], raise_exit_code=False) - if code == 0: - user.append(out) - elif code == 1 and not out: + result = self.run_command([*cmd, f"{user_type}.{key}"], raise_exit_code=False) + if result.code == 0: + user.append(result.output) + elif result.code == 1 and not result.output: user.append(None) else: - self._logger.error(f"Failed to get {user_type}.{key}.", details=err, exit_code=code) + raise _exception.GitTidyOperationError( + f"Failed to get {user_type}.{key}") return tuple(user) + @logger.sectioner("Git: Fetch Remotes by Pattern") def fetch_remote_branches_by_pattern( self, - branch_pattern: re.Pattern | None = None, + branch_pattern: _re.Pattern | None = None, remote_name: str = "origin", exists_ok: bool = False, not_fast_forward_ok: bool = False, - ): - remote_branches = self.run_command(["git", "branch", "-r"]).splitlines() + ) -> None: + remote_branches = self.run_command(["branch", "-r"]).output.splitlines() branch_names = [] for remote_branch in remote_branches: remote_branch = remote_branch.strip() @@ -312,20 +375,22 @@ def fetch_remote_branches_by_pattern( remote_branch = remote_branch.removeprefix(f"{remote_name}/") if not branch_pattern or branch_pattern.match(remote_branch): branch_names.append(remote_branch) - return self.fetch_remote_branches_by_name( + self.fetch_remote_branches_by_name( branch_names=branch_names, remote_name=remote_name, exists_ok=exists_ok, not_fast_forward_ok=not_fast_forward_ok, ) + return + @logger.sectioner("Git: Fetch Remotes by Name") def fetch_remote_branches_by_name( self, branch_names: str | list[str], remote_name: str = "origin", exists_ok: bool = False, not_fast_forward_ok: bool = False, - ): + ) -> None: if isinstance(branch_names, str): branch_names = [branch_names] if not exists_ok: @@ -335,19 +400,22 @@ def fetch_remote_branches_by_name( refspecs = [ f"{'+' if not_fast_forward_ok else ''}{branch_name}:{branch_name}" for branch_name in branch_names ] - self.run_command(["git", "fetch", remote_name, *refspecs]) + self.run_command(["fetch", remote_name, *refspecs]) # for branch_name in branch_names: # self._run(["git", "branch", "--track", branch_name, f"{remote_name}/{branch_name}"]) # self._run(["git", "fetch", "--all"]) # self._run(["git", "pull", "--all"]) return - def pull(self, fast_forward_only: bool = True): - cmd = ["git", "pull"] + @logger.sectioner("Git: Pull") + def pull(self, fast_forward_only: bool = True) -> None: + cmd = ["pull"] if fast_forward_only: cmd.append("--ff-only") - return self.run_command(cmd) + self.run_command(cmd) + return + @logger.sectioner("Git: Get Commits") def get_commits(self, revision_range: str | None = None) -> list[dict[str, str | list[str]]]: """ Get a list of commits. @@ -366,19 +434,20 @@ def get_commits(self, revision_range: str | None = None) -> list[dict[str, str | marker_commit_end = "" format = f"{marker_start}%n{hash}%n{author}%n{date}%n{commit}%n{marker_commit_end}" - cmd = ["git", "log", f"--pretty=format:{format}", "--name-only"] + cmd = ["log", f"--pretty=format:{format}", "--name-only"] if revision_range: cmd.append(revision_range) - out = self.run_command(cmd) + result = self.run_command(cmd) - pattern = re.compile( - rf"{re.escape(marker_start)}\n(.*?)\n(.*?)\n(.*?)\n(.*?){re.escape(marker_commit_end)}\n(.*?)(?:\n\n|$)", - re.DOTALL, + pattern = _re.compile( + rf"{_re.escape(marker_start)}\n(.*?)\n(.*?)\n(.*?)\n(.*?){_re.escape(marker_commit_end)}\n(.*?)(?:\n\n|$)", + _re.DOTALL, ) - matches = pattern.findall(out) - self._logger.success(f"Found {len(matches)} commits.", json.dumps(matches, indent=3)) + matches = pattern.findall(result.output) + logger.info(f"Found {len(matches)} commits.") + logger.debug("Commits", matches) commits = [] for match in matches: @@ -392,26 +461,30 @@ def get_commits(self, revision_range: str | None = None) -> list[dict[str, str | commits.append(commit_info) return commits + @logger.sectioner("Git: Get Current Branch Name") def current_branch_name(self) -> str: """Get the name of the current branch.""" - return self.run_command(["git", "branch", "--show-current"]) + return self.run_command(["branch", "--show-current"]).output - def branch_delete(self, branch_name: str, force: bool = False): - cmd = ["git", "branch", "-D" if force else "-d", branch_name] + @logger.sectioner("Git: Delete Branch") + def branch_delete(self, branch_name: str, force: bool = False) -> None: + cmd = ["branch", "-D" if force else "-d", branch_name] self.run_command(cmd) return - def branch_rename(self, new_name: str, force: bool = False): - cmd = ["git", "branch", "-M" if force else "-m", new_name] + @logger.sectioner("Git: Rename Branch") + def branch_rename(self, new_name: str, force: bool = False) -> None: + cmd = ["branch", "-M" if force else "-m", new_name] self.run_command(cmd) return + @logger.sectioner("Git: Get All Branch Names") def get_all_branch_names(self) -> tuple[str, list[str]]: - """Get the name of the current branch.""" - branches_str = self.run_command(["git", "branch"]) + """Get the name of all branches.""" + result = self.run_command(["branch"]) branches_other = [] branch_current = [] - for branch in branches_str.split("\n"): + for branch in result.output.split("\n"): branch = branch.strip() if not branch: continue @@ -420,12 +493,13 @@ def get_all_branch_names(self) -> tuple[str, list[str]]: else: branches_other.append(branch) if len(branch_current) > 1: - raise RuntimeError("More than one current branch found.") + raise _exception.GitTidyOperationError("More than one current branch found.") return branch_current[0], branches_other - def checkout(self, branch: str, create: bool = False, reset: bool = False, orphan: bool = False): + @logger.sectioner("Git: Checkout Branch") + def checkout(self, branch: str, create: bool = False, reset: bool = False, orphan: bool = False) -> None: """Checkout a branch.""" - cmd = ["git", "checkout"] + cmd = ["checkout"] if reset: cmd.append("-B") elif create: @@ -433,8 +507,10 @@ def checkout(self, branch: str, create: bool = False, reset: bool = False, orpha elif orphan: cmd.append("--orphan") cmd.append(branch) - return self.run_command(cmd) + self.run_command(cmd) + return + @logger.sectioner("Git: Get Distance To Ref") def get_distance(self, ref_start: str, ref_end: str = "HEAD") -> int: """ Get the distance between two commits. @@ -446,18 +522,19 @@ def get_distance(self, ref_start: str, ref_end: str = "HEAD") -> int: Returns: - int: The distance between the two commits. """ - return int(self.run_command(["git", "rev-list", "--count", f"{ref_start}..{ref_end}"])) + return int(self.run_command(["rev-list", "--count", f"{ref_start}..{ref_end}"]).output) + @logger.sectioner("Git: Get Tags") def get_tags(self) -> list[list[str]]: """Get a list of tags reachable from the current commit This returns a list of tags ordered by the commit date (newest first). Each element is a list itself, containing all tags that point to the same commit. """ - tags_on_branch = self.run_command(["git", "tag", "--merged"]).splitlines() - output = self.log(simplify_by_decoration=True, pretty="format:%D") + logs = self.log(simplify_by_decoration=True, pretty="format:%D") + tags_on_branch = self.run_command(["tag", "--merged"]).output.splitlines() tags = [] - for line in output.splitlines(): + for line in logs.splitlines(): potential_tags = line.split(", ") sub_list_added = False for potential_tag in potential_tags: @@ -470,27 +547,8 @@ def get_tags(self) -> list[list[str]]: tags[-1].append(tag) return tags - def get_latest_version(self, tag_prefix: str, dev_only: bool = False) -> PEP440SemVer | None: - tags_lists = self.get_tags() - if not tags_lists: - return - for tags_list in tags_lists: - ver_tags = [] - for tag in tags_list: - if tag.startswith(tag_prefix): - ver_tags.append(PEP440SemVer(tag.removeprefix(tag_prefix))) - if ver_tags: - if dev_only: - ver_tags = sorted(ver_tags, reverse=True) - for ver_tag in ver_tags: - if ver_tag.release_type == "dev": - return ver_tag - else: - return max(ver_tags) - return - - @property - def remotes(self) -> dict: + @logger.sectioner("Git: Get Remotes") + def get_remotes(self) -> dict[str, dict[str, str]]: """ Remote URLs of the git repository. @@ -511,18 +569,21 @@ def remotes(self) -> dict: } } """ - out = self.run_command(["git", "remote", "-v"]) + out = self.run_command(["remote", "-v"]).output remotes = {} for remote in out.splitlines(): remote_name, url, purpose_raw = remote.split() purpose = purpose_raw.removeprefix("(").removesuffix(")") remote_dict = remotes.setdefault(remote_name, {}) if purpose in remote_dict: - self._logger.error(f"Duplicate remote purpose '{purpose}' for remote '{remote_name}'.") + raise _exception.GitTidyOperationError( + f"Duplicate remote purpose '{purpose}' for remote '{remote_name}'." + ) remote_dict[purpose] = url return remotes - def repo_name( + @logger.sectioner("Git: Get Remote Repo Name") + def get_remote_repo_name( self, remote_name: str = "origin", remote_purpose: str = "push", @@ -531,15 +592,15 @@ def repo_name( ) -> tuple[str, str] | None: def extract_repo_name_from_url(url): # Regular expression pattern for extracting repo name from GitHub URL - pattern = re.compile(r"github\.com[/:]([\w\-]+)/([\w\-.]+?)(?:\.git)?$") + pattern = _re.compile(r"github\.com[/:]([\w\-]+)/([\w\-.]+?)(?:\.git)?$") match = pattern.search(url) if not match: - self._logger.attention(f"Failed to extract repo name from URL '{url}'.") + logger.info(f"Failed to extract repo name from URL '{url}'.") return None owner, repo = match.groups()[0:2] return owner, repo - remotes = self.remotes + remotes = self.get_remotes() if not remotes: return if remote_name in remotes: @@ -565,28 +626,41 @@ def extract_repo_name_from_url(url): return repo_name return - def check_gitattributes(self): + @logger.sectioner("Git: Check .gitattributes") + def check_gitattributes(self) -> bool: command = ["sh", "-c", "git ls-files | git check-attr -a --stdin | grep 'text: auto'"] - out = self.run_command(command) - if out: - return False - return True - - def file_at_hash(self, commit_hash: str, path: str | Path, raise_missing: bool = True) -> str | None: - out, err, code = self.run_command(["git", "show", f"{commit_hash}:{path}"], raise_exit_code=raise_missing) - if err or code != 0: + result = _pyshellman.run( + command=command, + cwd=self._path, + raise_execution=True, + raise_exit_code=True, + raise_stderr=True, + text_output=True, + ) + logger.info("Run command", msg=result.summary, code_title="Result", code=result) + return not result.output + + @logger.sectioner("Git: Get File at Hash") + def file_at_hash(self, commit_hash: str, path: str | _Path, raise_missing: bool = True) -> str | None: + result = self.run_command(["show", f"{commit_hash}:{path}"], raise_exit_code=raise_missing) + if result.error or result.code != 0: if raise_missing: - self._logger.error(f"Failed to get file '{path}' at commit '{commit_hash}'.", details=err) - return None - return out + raise _exception.GitTidyOperationError( + f"Failed to get file '{path}' at commit '{commit_hash}'." + ) + return + return result.output - def discard_changes(self, path: str | Path = "."): + @logger.sectioner("Git: Discard Changes") + def discard_changes(self, path: str | _Path = ".") -> None: """Revert all uncommitted changes in the specified path, back to the state of the last commit.""" - return self.run_command(["git", "checkout", "--", str(path)]) + self.run_command(["checkout", "--", str(path)]) + return + @logger.sectioner("Git: Stash") def stash( - self, name: str = "Stashed by RepoDynamics", include: Literal["tracked", "untracked", "all"] = "all" - ): + self, include: _Literal["tracked", "untracked", "all"] = "all", name: str = "Stashed by GitTidy" + ) -> None: """Stash changes in the working directory. This takes the modified files, stages them and saves them on a stack of unfinished changes @@ -603,55 +677,25 @@ def stash( - 'untracked': Stash tracked and untracked files. - 'all': Stash all files, including ignored files. """ - command = ["git", "stash"] + command = ["stash"] if include in ["untracked", "all"]: command.extend(["save", "--include-untracked" if include == "untracked" else "--all"]) if name: command.append(str(name)) - return self.run_command(command) + self.run_command(command) + return - def stash_pop(self): + @logger.sectioner("Git: Pop Stash") + def stash_pop(self) -> None: """Reapply the most recently stashed changes and remove the stash from the stack. This will take the changes stored in the stash and apply them back to the working directory, removing the stash from the stack. """ - return self.run_command(["git", "stash", "pop"], raise_exit_code=False) - - @property - def path_root(self) -> Path: - return self._path - - @contextmanager - def _temp_committer(self): - committer_username, committer_email = self.get_user(user_type="committer", scope="local") - if committer_username != self._COMMITTER_USERNAME or committer_email != self._COMMITTER_EMAIL: - self.set_user( - username=self._COMMITTER_USERNAME, - email=self._COMMITTER_EMAIL, - user_type="committer", - scope="local", - ) - yield - if committer_username != self._COMMITTER_USERNAME or committer_email != self._COMMITTER_EMAIL: - self.set_user( - username=committer_username, email=committer_email, user_type="committer", scope="local" - ) - return - - @contextmanager - def temp_author(self, username: str = None, email: str = None): - author_username, author_email = self.get_user(user_type="author", scope="local") - username = username or self._COMMITTER_USERNAME - email = email or self._COMMITTER_EMAIL - if author_username != username or author_email != email: - self.set_user(username=username, email=email, user_type="author", scope="local") - yield - if author_username != username or author_email != email: - self.set_user(username=author_username, email=author_email, user_type="author", scope="local") + self.run_command(["stash", "pop"], raise_exit_code=False) return - def run_command( + def _run_command( self, command: list[str], raise_exit_code: bool = True, @@ -666,5 +710,38 @@ def run_command( raise_stderr=raise_stderr, text_output=text_output, ) - logger.info("Run git command", msg=result.summary, code_title="Result", code=result) + logger.info("Execute git command", msg=result.summary, code_title="Result", code=result) return result + + @_contextmanager + def _temporary_credentials(self): + if not self._author_persistent: + self.set_user( + username=self._author_username, + email=self._author_email, + user_type="author", + scope=self._author_scope + ) + if not self._committer_persistent: + self.set_user( + username=self._committer_username, + email=self._committer_email, + user_type="committer", + scope=self._committer_scope, + ) + yield + if not self._author_persistent: + self.set_user( + username=self._original_author_username, + email=self._original_author_email, + user_type="author", + scope=self._author_scope + ) + if not self._committer_persistent: + self.set_user( + username=self._original_committer_username, + email=self._original_committer_email, + user_type="committer", + scope=self._committer_scope, + ) + return