Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of commits in upload large folder #2546

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions src/huggingface_hub/_upload_large_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
logger = logging.getLogger(__name__)

WAITING_TIME_IF_NO_TASKS = 10 # seconds
MAX_NB_REGULAR_FILES_PER_COMMIT = 75
MAX_NB_LFS_FILES_PER_COMMIT = 150


def upload_large_folder_internal(
Expand Down Expand Up @@ -373,17 +375,18 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
if (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and (status.last_commit_attempt is None or time.time() - status.last_commit_attempt > 5 * 60)
and status.last_commit_attempt is not None
and time.time() - status.last_commit_attempt > 5 * 60
):
status.nb_workers_commit += 1
logger.debug("Job: commit (more than 5 minutes since last commit attempt)")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 2. Commit if at least 25 files are ready to commit
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 25:
# 2. Commit if at least 100 files are ready to commit
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 150:
status.nb_workers_commit += 1
logger.debug("Job: commit (>25 files ready)")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
logger.debug("Job: commit (>100 files ready)")
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 3. Get upload mode if at least 10 files
elif status.queue_get_upload_mode.qsize() >= 10:
Expand Down Expand Up @@ -430,18 +433,39 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
logger.debug("Job: get upload mode")
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))

# 10. Commit if at least 1 file
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() > 0:
# 10. Commit if at least 1 file and 1 min since last commit attempt
elif (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and status.last_commit_attempt is not None
and time.time() - status.last_commit_attempt > 1 * 60
):
status.nb_workers_commit += 1
logger.debug("Job: commit (1 min since last commit attempt)")
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 11. Commit if at least 1 file all other queues are empty and all workers are waiting
# e.g. when it's the last commit
elif (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and status.queue_sha256.qsize() == 0
and status.queue_get_upload_mode.qsize() == 0
and status.queue_preupload_lfs.qsize() == 0
and status.nb_workers_sha256 == 0
and status.nb_workers_get_upload_mode == 0
and status.nb_workers_preupload_lfs == 0
):
status.nb_workers_commit += 1
logger.debug("Job: commit")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 11. If all queues are empty, exit
# 12. If all queues are empty, exit
elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items):
logger.info("All files have been processed! Exiting worker.")
return None

# 12. If no task is available, wait
# 13. If no task is available, wait
else:
status.nb_workers_waiting += 1
logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)")
Expand Down Expand Up @@ -547,6 +571,30 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
return [queue.get() for _ in range(min(queue.qsize(), n))]


def _get_items_to_commit(queue: "queue.Queue[JOB_ITEM_T]") -> List[JOB_ITEM_T]:
"""Special case for commit job: the number of items to commit depends on the type of files."""
# Can take at most 50 regular files and/or 100 LFS files in a single commit
items: List[JOB_ITEM_T] = []
nb_lfs, nb_regular = 0, 0
while True:
# If empty queue => commit everything
if queue.qsize() == 0:
return items

# If we have enough items => commit them
if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT:
return items

# Else, get a new item and increase counter
item = queue.get()
items.append(item)
_, metadata = item
if metadata.upload_mode == "lfs":
nb_lfs += 1
else:
nb_regular += 1


def _print_overwrite(report: str) -> None:
"""Print a report, overwriting the previous lines.

Expand Down
5 changes: 3 additions & 2 deletions src/huggingface_hub/hf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5342,15 +5342,16 @@ def upload_large_folder(

Order of priority:
1. Commit if more than 5 minutes since last commit attempt (and at least 1 file).
2. Commit if at least 25 files are ready to commit.
2. Commit if at least 150 files are ready to commit.
3. Get upload mode if at least 10 files have been hashed.
4. Pre-upload LFS file if at least 1 file and no worker is pre-uploading.
5. Hash file if at least 1 file and no worker is hashing.
6. Get upload mode if at least 1 file and no worker is getting upload mode.
7. Pre-upload LFS file if at least 1 file (exception: if hf_transfer is enabled, only 1 worker can preupload LFS at a time).
8. Hash file if at least 1 file to hash.
9. Get upload mode if at least 1 file to get upload mode.
10. Commit if at least 1 file to commit.
10. Commit if at least 1 file to commit and at least 1 min since last commit attempt.
11. Commit if at least 1 file to commit and all other queues are empty.

Special rules:
- If `hf_transfer` is enabled, only 1 LFS uploader at a time. Otherwise the CPU would be bloated by `hf_transfer`.
Expand Down
Loading