Skip to content

Commit

Permalink
subcmds: reduce multiprocessing serialization overhead
Browse files Browse the repository at this point in the history
Follow the same approach as 39ffd99 to reduce serialization overhead.

Below benchmarks are tested with 2.7k projects on my workstation
(warm cache). git tracing is disabled for benchmark.

(seconds)              | v2.48 | v2.48 | this CL | this CL
	               |       |  -j32 |         |    -j32
-----------------------------------------------------------
with clean tree state:
branches (none)        |   5.6 |   5.9 |    1.0  |    0.9
status (clean)         |  21.3 |   9.4 |   19.4  |    4.7
diff (none)            |   7.6 |   7.2 |    5.7  |    2.2
prune (none)           |   5.7 |   6.1 |    1.3  |    1.2
abandon (none)         |  19.4 |  18.6 |    0.9  |    0.8
upload (none)          |  19.7 |  18.7 |    0.9  |    0.8
forall -c true         |   7.5 |   7.6 |    0.6  |    0.6
forall -c "git log -1" |  11.3 |  11.1 |    0.6  |    0.6

with branches:
start BRANCH --all     |  21.9 |  20.3 |   13.6  |    2.6
checkout BRANCH        |  29.1 |  27.8 |    1.1  |    1.0
branches (2)           |  28.0 |  28.6 |    1.5  |    1.3
abandon BRANCH         |  29.2 |  27.5 |    9.7  |    2.2

Bug: b/371638995
Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221
Reviewed-by: Josip Sokcevic <[email protected]>
Tested-by: Kuang-che Wu <[email protected]>
Commit-Queue: Kuang-che Wu <[email protected]>
  • Loading branch information
kcwu authored and LUCI committed Oct 23, 2024
1 parent 39ffd99 commit 8da4861
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 172 deletions.
10 changes: 7 additions & 3 deletions command.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ def ParallelContext(cls):
cls._parallel_context = None

@classmethod
def _SetParallelContext(cls, context):
def _InitParallelWorker(cls, context, initializer):
cls._parallel_context = context
if initializer:
initializer()

@classmethod
def ExecuteInParallel(
Expand All @@ -281,6 +283,7 @@ def ExecuteInParallel(
output=None,
ordered=False,
chunksize=WORKER_BATCH_SIZE,
initializer=None,
):
"""Helper for managing parallel execution boiler plate.
Expand All @@ -307,6 +310,7 @@ def ExecuteInParallel(
ordered: Whether the jobs should be processed in order.
chunksize: The number of jobs processed in batch by parallel
workers.
initializer: Worker initializer.
Returns:
The |callback| function's results are returned.
Expand All @@ -318,8 +322,8 @@ def ExecuteInParallel(
else:
with multiprocessing.Pool(
jobs,
initializer=cls._SetParallelContext,
initargs=(cls._parallel_context,),
initializer=cls._InitParallelWorker,
initargs=(cls._parallel_context, initializer),
) as pool:
submit = pool.imap if ordered else pool.imap_unordered
return callback(
Expand Down
30 changes: 18 additions & 12 deletions subcmds/abandon.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ def ValidateOptions(self, opt, args):
else:
args.insert(0, "'All local branches'")

def _ExecuteOne(self, all_branches, nb, project):
@classmethod
def _ExecuteOne(cls, all_branches, nb, project_idx):
"""Abandon one project."""
project = cls.get_parallel_context()["projects"][project_idx]
if all_branches:
branches = project.GetBranches()
else:
Expand All @@ -89,7 +91,7 @@ def _ExecuteOne(self, all_branches, nb, project):
if status is not None:
ret[name] = status

return (ret, project, errors)
return (ret, project_idx, errors)

def Execute(self, opt, args):
nb = args[0].split()
Expand All @@ -102,7 +104,8 @@ def Execute(self, opt, args):
_RelPath = lambda p: p.RelPath(local=opt.this_manifest_only)

def _ProcessResults(_pool, pm, states):
for results, project, errors in states:
for results, project_idx, errors in states:
project = all_projects[project_idx]
for branch, status in results.items():
if status:
success[branch].append(project)
Expand All @@ -111,15 +114,18 @@ def _ProcessResults(_pool, pm, states):
aggregate_errors.extend(errors)
pm.update(msg="")

self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, opt.all, nb),
all_projects,
callback=_ProcessResults,
output=Progress(
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, opt.all, nb),
range(len(all_projects)),
callback=_ProcessResults,
output=Progress(
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
),
chunksize=1,
)

width = max(
itertools.chain(
Expand Down
49 changes: 26 additions & 23 deletions subcmds/branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ class Branches(Command):
"""
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS

@classmethod
def _ExpandProjectToBranches(cls, project_idx):
"""Expands a project into a list of branch names & associated info.
Args:
project_idx: project.Project index
Returns:
List[Tuple[str, git_config.Branch, int]]
"""
branches = []
project = cls.get_parallel_context()["projects"][project_idx]
for name, b in project.GetBranches().items():
branches.append((name, b, project_idx))
return branches

def Execute(self, opt, args):
projects = self.GetProjects(
args, all_manifests=not opt.this_manifest_only
Expand All @@ -107,17 +123,20 @@ def Execute(self, opt, args):
project_cnt = len(projects)

def _ProcessResults(_pool, _output, results):
for name, b in itertools.chain.from_iterable(results):
for name, b, project_idx in itertools.chain.from_iterable(results):
b.project = projects[project_idx]
if name not in all_branches:
all_branches[name] = BranchInfo(name)
all_branches[name].add(b)

self.ExecuteInParallel(
opt.jobs,
expand_project_to_branches,
projects,
callback=_ProcessResults,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
self.ExecuteInParallel(
opt.jobs,
self._ExpandProjectToBranches,
range(len(projects)),
callback=_ProcessResults,
)

names = sorted(all_branches)

Expand Down Expand Up @@ -191,19 +210,3 @@ def _ProcessResults(_pool, _output, results):
else:
out.write(" in all projects")
out.nl()


def expand_project_to_branches(project):
"""Expands a project into a list of branch names & associated information.
Args:
project: project.Project
Returns:
List[Tuple[str, git_config.Branch]]
"""
branches = []
for name, b in project.GetBranches().items():
b.project = project
branches.append((name, b))
return branches
34 changes: 19 additions & 15 deletions subcmds/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from error import GitError
from error import RepoExitError
from progress import Progress
from project import Project
from repo_logging import RepoLogger


Expand All @@ -30,7 +29,7 @@
class CheckoutBranchResult(NamedTuple):
# Whether the Project is on the branch (i.e. branch exists and no errors)
result: bool
project: Project
project_idx: int
error: Exception


Expand Down Expand Up @@ -62,15 +61,17 @@ def ValidateOptions(self, opt, args):
if not args:
self.Usage()

def _ExecuteOne(self, nb, project):
@classmethod
def _ExecuteOne(cls, nb, project_idx):
"""Checkout one project."""
error = None
result = None
project = cls.get_parallel_context()["projects"][project_idx]
try:
result = project.CheckoutBranch(nb)
except GitError as e:
error = e
return CheckoutBranchResult(result, project, error)
return CheckoutBranchResult(result, project_idx, error)

def Execute(self, opt, args):
nb = args[0]
Expand All @@ -83,22 +84,25 @@ def Execute(self, opt, args):

def _ProcessResults(_pool, pm, results):
for result in results:
project = all_projects[result.project_idx]
if result.error is not None:
err.append(result.error)
err_projects.append(result.project)
err_projects.append(project)
elif result.result:
success.append(result.project)
success.append(project)
pm.update(msg="")

self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, nb),
all_projects,
callback=_ProcessResults,
output=Progress(
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, nb),
range(len(all_projects)),
callback=_ProcessResults,
output=Progress(
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
),
)

if err_projects:
for p in err_projects:
Expand Down
27 changes: 16 additions & 11 deletions subcmds/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ def _Options(self, p):
help="paths are relative to the repository root",
)

def _ExecuteOne(self, absolute, local, project):
@classmethod
def _ExecuteOne(cls, absolute, local, project_idx):
"""Obtains the diff for a specific project.
Args:
absolute: Paths are relative to the root.
local: a boolean, if True, the path is relative to the local
(sub)manifest. If false, the path is relative to the outermost
manifest.
project: Project to get status of.
project_idx: Project index to get status of.
Returns:
The status of the project.
"""
buf = io.StringIO()
project = cls.get_parallel_context()["projects"][project_idx]
ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local)
return (ret, buf.getvalue())

Expand All @@ -71,12 +73,15 @@ def _ProcessResults(_pool, _output, results):
ret = 1
return ret

return self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._ExecuteOne, opt.absolute, opt.this_manifest_only
),
all_projects,
callback=_ProcessResults,
ordered=True,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
return self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._ExecuteOne, opt.absolute, opt.this_manifest_only
),
range(len(all_projects)),
callback=_ProcessResults,
ordered=True,
chunksize=1,
)
Loading

0 comments on commit 8da4861

Please sign in to comment.