Skip to content

Commit

Permalink
Create commit by streaming a ndjson payload (allow lots of file in si…
Browse files Browse the repository at this point in the history
…ngle commit) (#1117)

* ✨ New chunk_iterable util

* ✨ Chunk LFS preupload call

* ✅ Add a test

* 💄 Code quality

* 📝 Improve documentation for the util function

* ✅ More extensive testing

* ♻ Alternative version of _chunk_iter

* 🚚 `utils.common` -> `utils._chunk_utils`

* first draft to stream ndjson data

* doc

* fix + refacto + docstring

* refacto chunk iterator

* retrieve LFS information by chunks

* FIX http backoff on file upload + add test

* adapt test

* FIX not existing repo test

* mypy fix

* no need for token

* increase lot of files test

* quality

* adapt huge test

* increase chunk size

* add test

Co-authored-by: SBrandeis <[email protected]>
  • Loading branch information
Wauplin and SBrandeis authored Oct 21, 2022
1 parent 930123c commit 7e089e6
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 96 deletions.
166 changes: 102 additions & 64 deletions src/huggingface_hub/_commit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import BinaryIO, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union

import requests

from .constants import ENDPOINT
from .lfs import UploadInfo, _validate_batch_actions, lfs_upload, post_lfs_batch_info
from .utils import build_hf_headers, hf_raise_for_status, logging, validate_hf_hub_args
from .utils import (
build_hf_headers,
chunk_iterable,
hf_raise_for_status,
logging,
validate_hf_hub_args,
)
from .utils._typing import Literal


Expand Down Expand Up @@ -200,23 +206,31 @@ def upload_lfs_files(
error
"""
# Step 1: retrieve upload instructions from the LFS batch endpoint
batch_actions, batch_errors = post_lfs_batch_info(
upload_infos=[op._upload_info() for op in additions],
token=token,
repo_id=repo_id,
repo_type=repo_type,
endpoint=endpoint,
)
if batch_errors:
message = "\n".join(
[
f'Encountered error for file with OID {err.get("oid")}:'
f' `{err.get("error", {}).get("message")}'
for err in batch_errors
]
# Step 1: retrieve upload instructions from the LFS batch endpoint.
# Upload instructions are retrieved by chunk of 256 files to avoid reaching
# the payload limit.
batch_actions: List[Dict] = []
for chunk in chunk_iterable(additions, chunk_size=256):
batch_actions_chunk, batch_errors_chunk = post_lfs_batch_info(
upload_infos=[op._upload_info() for op in chunk],
token=token,
repo_id=repo_id,
repo_type=repo_type,
endpoint=endpoint,
)
raise ValueError(f"LFS batch endpoint returned errors:\n{message}")

# If at least 1 error, we do not retrieve information for other chunks
if batch_errors_chunk:
message = "\n".join(
[
f'Encountered error for file with OID {err.get("oid")}:'
f' `{err.get("error", {}).get("message")}'
for err in batch_errors_chunk
]
)
raise ValueError(f"LFS batch endpoint returned errors:\n{message}")

batch_actions += batch_actions_chunk

# Step 2: upload files concurrently according to these instructions
oid2addop = {add_op._upload_info().sha256.hex(): add_op for add_op in additions}
Expand Down Expand Up @@ -293,6 +307,7 @@ def _upload_lfs_object(
verify_action = lfs_batch_action["actions"].get("verify")

with operation.as_file() as fileobj:
logger.debug(f"Uploading {operation.path_in_repo} as LFS file...")
lfs_upload(
fileobj=fileobj,
upload_action=upload_action,
Expand Down Expand Up @@ -326,7 +341,6 @@ def fetch_upload_modes(
token: Optional[str],
revision: str,
endpoint: Optional[str] = None,
create_pr: Optional[bool] = None,
) -> List[Tuple[CommitOperationAdd, UploadMode]]:
"""
Requests the Hub "preupload" endpoint to determine wether each input file
Expand Down Expand Up @@ -358,31 +372,34 @@ def fetch_upload_modes(
"""
endpoint = endpoint if endpoint is not None else ENDPOINT
headers = build_hf_headers(use_auth_token=token)
payload = {
"files": [
{
"path": op.path_in_repo,
"sample": base64.b64encode(op._upload_info().sample).decode("ascii"),
"size": op._upload_info().size,
"sha": op._upload_info().sha256.hex(),
}
for op in additions
]
}

resp = requests.post(
f"{endpoint}/api/{repo_type}s/{repo_id}/preupload/{revision}",
json=payload,
headers=headers,
params={"create_pr": "1"} if create_pr else None,
)
hf_raise_for_status(resp, endpoint_name="preupload")

preupload_info = validate_preupload_info(resp.json())
# Fetch upload mode (LFS or regular) chunk by chunk.
path2mode: Dict[str, UploadMode] = {}
for chunk in chunk_iterable(additions, 256):
payload = {
"files": [
{
"path": op.path_in_repo,
"sample": base64.b64encode(op._upload_info().sample).decode(
"ascii"
),
"size": op._upload_info().size,
"sha": op._upload_info().sha256.hex(),
}
for op in chunk
]
}

path2mode: Dict[str, UploadMode] = {
file["path"]: file["uploadMode"] for file in preupload_info["files"]
}
resp = requests.post(
f"{endpoint}/api/{repo_type}s/{repo_id}/preupload/{revision}",
json=payload,
headers=headers,
)
hf_raise_for_status(resp)
preupload_info = validate_preupload_info(resp.json())
path2mode.update(
**{file["path"]: file["uploadMode"] for file in preupload_info["files"]}
)

return [(op, path2mode[op.path_in_repo]) for op in additions]

Expand All @@ -393,35 +410,56 @@ def prepare_commit_payload(
commit_message: str,
commit_description: Optional[str] = None,
parent_commit: Optional[str] = None,
):
) -> Iterable[Dict[str, Any]]:
"""
Builds the payload to POST to the `/commit` API of the Hub
Builds the payload to POST to the `/commit` API of the Hub.
Payload is returned as an iterator so that it can be streamed as a ndjson in the
POST request.
For more information, see:
- https://github.com/huggingface/huggingface_hub/issues/1085#issuecomment-1265208073
- http://ndjson.org/
"""
commit_description = commit_description if commit_description is not None else ""

payload = {
**({"parentCommit": parent_commit} if parent_commit is not None else {}),
"summary": commit_message,
"description": commit_description,
"files": [
{
# 1. Send a header item with the commit metadata
header_value = {"summary": commit_message, "description": commit_description}
if parent_commit is not None:
header_value["parentCommit"] = parent_commit
yield {"key": "header", "value": header_value}

# 2. Send regular files, one per line
yield from (
{
"key": "file",
"value": {
"content": add_op.b64content().decode(),
"path": add_op.path_in_repo,
"encoding": "base64",
"content": add_op.b64content().decode(),
}
for (add_op, upload_mode) in additions
if upload_mode == "regular"
],
"lfsFiles": [
{
},
}
for (add_op, upload_mode) in additions
if upload_mode == "regular"
)

# 3. Send LFS files, one per line
yield from (
{
"key": "lfsFile",
"value": {
"path": add_op.path_in_repo,
"algo": "sha256",
"oid": add_op._upload_info().sha256.hex(),
"size": add_op._upload_info().size,
}
for (add_op, upload_mode) in additions
if upload_mode == "lfs"
],
"deletedFiles": [{"path": del_op.path_in_repo} for del_op in deletions],
}
return payload
},
}
for (add_op, upload_mode) in additions
if upload_mode == "lfs"
)

# 4. Send deleted files, one per line
yield from (
{"key": "deletedFile", "value": {"path": del_op.path_in_repo}}
for del_op in deletions
)
49 changes: 36 additions & 13 deletions src/huggingface_hub/hf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import subprocess
Expand Down Expand Up @@ -1877,8 +1878,19 @@ def create_commit(
`repo_type` are set correctly. If repo does not exist, create it first using
[`~hf_api.create_repo`].
</Tip>
<Tip warning={true}>
`create_commit` is limited to 25k LFS files and a 1GB payload for regular files.
</Tip>
"""
_CREATE_COMMIT_NO_REPO_ERROR_MESSAGE = (
"\nNote: Creating a commit assumes that the repo already exists on the"
" Huggingface Hub. Please use `create_repo` if it's not the case."
)

if parent_commit is not None and not REGEX_COMMIT_OID.fullmatch(parent_commit):
raise ValueError(
"`parent_commit` is not a valid commit OID. It must match the"
Expand Down Expand Up @@ -1928,13 +1940,9 @@ def create_commit(
token=token or self.token,
revision=revision,
endpoint=self.endpoint,
create_pr=create_pr,
)
except RepositoryNotFoundError as e:
e.append_to_message(
"\nNote: Creating a commit assumes that the repo already exists on the"
" Huggingface Hub. Please use `create_repo` if it's not the case."
)
e.append_to_message(_CREATE_COMMIT_NO_REPO_ERROR_MESSAGE)
raise

upload_lfs_files(
Expand All @@ -1958,14 +1966,29 @@ def create_commit(
)
commit_url = f"{self.endpoint}/api/{repo_type}s/{repo_id}/commit/{revision}"

headers = self._build_hf_headers(use_auth_token=token, is_write_action=True)
commit_resp = requests.post(
url=commit_url,
headers=headers,
json=commit_payload,
params={"create_pr": "1"} if create_pr else None,
)
hf_raise_for_status(commit_resp, endpoint_name="commit")
def _payload_as_ndjson() -> Iterable[bytes]:
for item in commit_payload:
yield json.dumps(item).encode()
yield b"\n"

headers = {
# See https://github.com/huggingface/huggingface_hub/issues/1085#issuecomment-1265208073
"Content-Type": "application/x-ndjson",
**self._build_hf_headers(use_auth_token=token, is_write_action=True),
}

try:
commit_resp = requests.post(
url=commit_url,
headers=headers,
data=_payload_as_ndjson(), # type: ignore
params={"create_pr": "1"} if create_pr else None,
)
hf_raise_for_status(commit_resp, endpoint_name="commit")
except RepositoryNotFoundError as e:
e.append_to_message(_CREATE_COMMIT_NO_REPO_ERROR_MESSAGE)
raise

commit_data = commit_resp.json()
return CommitInfo(
commit_url=commit_data["commitUrl"],
Expand Down
1 change: 1 addition & 0 deletions src/huggingface_hub/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
HFCacheInfo,
scan_cache_dir,
)
from ._chunk_utils import chunk_iterable
from ._datetime import parse_datetime
from ._errors import (
BadRequestError,
Expand Down
64 changes: 64 additions & 0 deletions src/huggingface_hub/utils/_chunk_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# coding=utf-8
# Copyright 2022-present, the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains a utility to iterate by chunks over an iterator."""
import itertools
from typing import Iterable, TypeVar


T = TypeVar("T")


def chunk_iterable(iterable: Iterable[T], chunk_size: int) -> Iterable[Iterable[T]]:
"""Iterates over an iterator chunk by chunk.
Taken from https://stackoverflow.com/a/8998040.
See also https://github.com/huggingface/huggingface_hub/pull/920#discussion_r938793088.
Args:
iterable (`Iterable`):
The iterable on which we want to iterate.
chunk_size (`int`):
Size of the chunks. Must be a strictly positive integer (e.g. >0).
Example:
```python
>>> from huggingface_hub.utils import chunk_iterable
>>> for items in chunk_iterable(range(17), chunk_size=8):
... print(items)
# [0, 1, 2, 3, 4, 5, 6, 7]
# [8, 9, 10, 11, 12, 13, 14, 15]
# [16] # smaller last chunk
```
Raises:
[`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError)
If `chunk_size` <= 0.
<Tip warning={true}>
The last chunk can be smaller than `chunk_size`.
</Tip>
"""
if not isinstance(chunk_size, int) or chunk_size <= 0:
raise ValueError("`chunk_size` must be a strictly positive integer (>0).")

iterator = iter(iterable)
while True:
try:
next_item = next(iterator)
except StopIteration:
return
yield itertools.chain((next_item,), itertools.islice(iterator, chunk_size - 1))
Loading

0 comments on commit 7e089e6

Please sign in to comment.