Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed Up Publishing Times #1120

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 45 additions & 31 deletions client/ayon_core/lib/file_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import sys
import errno
from concurrent.futures import ThreadPoolExecutor

from ayon_core.lib import create_hard_link

Expand All @@ -11,6 +12,8 @@
else:
from shutil import copyfile

from .threadpool import as_completed_stop_and_raise_on_error


class DuplicateDestinationError(ValueError):
"""Error raised when transfer destination already exists in queue.
Expand Down Expand Up @@ -109,41 +112,52 @@ def add(self, src, dst, mode=MODE_COPY):
self._transfers[dst] = (src, opts)

def process(self):
# Backup any existing files
for dst, (src, _) in self._transfers.items():
self.log.debug("Checking file ... {} -> {}".format(src, dst))
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
continue

# Backup original file
# todo: add timestamp or uuid to ensure unique
backup = dst + ".bak"
self._backup_to_original[backup] = dst
with ThreadPoolExecutor(max_workers=8) as executor:
# Submit backup tasks
backup_futures = [
executor.submit(self._backup_file, dst, src)
for dst, (src, _) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, backup_futures, logger=self.log)

# Submit transfer tasks
transfer_futures = [
executor.submit(self._transfer_file, dst, src, opts)
for dst, (src, opts) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, transfer_futures, logger=self.log)

def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
return

# Backup original file
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug(f"Backup existing file: {dst} -> {backup}")
os.rename(dst, backup)

def _transfer_file(self, dst, src, opts):
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Backup existing file: {} -> {}".format(dst, backup))
os.rename(dst, backup)

# Copy the files to transfer
for dst, (src, opts) in self._transfers.items():
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Source and destination are same files {} -> {}".format(
src, dst))
continue
f"Source and destination are same files {src} -> {dst}")
return

self._create_folder_for_file(dst)
self._create_folder_for_file(dst)

if opts["mode"] == self.MODE_COPY:
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(
src, dst))
create_hard_link(src, dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug(f"Copying file ... {src} -> {dst}")
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug(f"Hardlinking file ... {src} -> {dst}")
create_hard_link(src, dst)

self._transferred.append(dst)
self._transferred.append(dst)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this runs in a thread now - and hence it should not do this on an object outside of the thread. However, in Python implementations this should still be safe due to the GIL and how the interpreter locks, etc.

Nonetheless, in a potential non-GIL or free threaded Python 3 world this would be unsafe and instead we should be instead relying on the future.result() instead.


def finalize(self):
# Delete any backed up files
Expand Down
49 changes: 49 additions & 0 deletions client/ayon_core/lib/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional

log = logging.getLogger(__name__)


def as_completed_stop_and_raise_on_error(
Copy link
Member

@iLLiCiTiT iLLiCiTiT Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add this as method of file transaction. This is not much reusable elsewhere. I guess it is here for hero version integration? If yes, we should use file transactions there too. If that would be too much work, we can skip speed enhancement of hero integration in this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm re-using it in two places currently - that's why I added it into a separate file. But you mean to add it to file_transaction.py but not necessarily the FileTransaction class itself. If so, I can definitely do that! :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now moved it to the other file (but not a method because it's also used outside of the class). See 9c67bf1

@iLLiCiTiT better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I assume the function may get more use if we were to move more things to thread pools, e.g. transcoding, etc.

executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.

The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions, but re-raise the last exception only.
"""
if logger is None:
logger = log

for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers however, will still complete so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return

# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)

# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)

# Raise the last exception
raise exceptions[-1]
24 changes: 18 additions & 6 deletions client/ayon_core/plugins/publish/integrate_hero_version.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import os
import copy
import errno
import itertools
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
Comment on lines +8 to +12
Copy link
Collaborator

@BigRoy BigRoy Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is copied from here.

However, we're also using speedcopy in the codebase without that fallback, e.g. here:

Which may hint that either the fallback is redundant nowadays - or we have other areas in the codebase that are potentially buggy on Linux? @iLLiCiTiT @antirotor do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speedcopy is in dependencies of AYON (

speedcopy = "^2.1"
) and it has its own fallback if it cannot use server-side copy. What is the issue with spedcopy on linux?


import clique
import pyblish.api
Expand All @@ -13,6 +21,7 @@
from ayon_api.utils import create_entity_id

from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,
Expand Down Expand Up @@ -415,11 +424,14 @@ def integrate_instance(
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
for src_path, dst_path in src_to_dst_file_paths:
self.copy_file(src_path, dst_path)

for src_path, dst_path in other_file_paths_mapping:
self.copy_file(src_path, dst_path)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path
in itertools.chain(src_to_dst_file_paths,
other_file_paths_mapping)
]
as_completed_stop_and_raise_on_error(executor, futures)

# Update prepared representation etity data with files
# and integrate it to server.
Expand Down Expand Up @@ -648,7 +660,7 @@ def copy_file(self, src_path, dst_path):
src_path, dst_path
))

shutil.copy(src_path, dst_path)
copyfile(src_path, dst_path)

def version_from_representations(self, project_name, repres):
for repre in repres:
Expand Down