Skip to content

Commit

Permalink
Merge pull request #1006 from roboflow/feature/add_consecutive_fails_…
Browse files Browse the repository at this point in the history
…five_up

Add give-up on errors in workflows execution
  • Loading branch information
PawelPeczek-Roboflow authored Feb 7, 2025
2 parents 7488613 + 001b63e commit 01857b6
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
3 changes: 3 additions & 0 deletions inference_cli/lib/workflows/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def process_images_directory_with_workflow(
debug_mode: bool = False,
api_url: str = "https://detect.roboflow.com",
processing_threads: Optional[int] = None,
max_failures: Optional[int] = None,
) -> None:
if processing_target is ProcessingTarget.INFERENCE_PACKAGE:

Expand All @@ -136,6 +137,7 @@ def process_images_directory_with_workflow(
aggregate_structured_results=aggregate_structured_results,
aggregation_format=aggregation_format,
debug_mode=debug_mode,
max_failures=max_failures,
)
return None
_ = process_image_directory_with_workflow_using_api(
Expand All @@ -154,5 +156,6 @@ def process_images_directory_with_workflow(
aggregation_format=aggregation_format,
debug_mode=debug_mode,
processing_threads=processing_threads,
max_failures=max_failures,
)
return None
30 changes: 28 additions & 2 deletions inference_cli/lib/workflows/local_image_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def process_image_directory_with_workflow_using_inference_package(
aggregate_structured_results: bool = True,
aggregation_format: OutputFileType = OutputFileType.JSONL,
debug_mode: bool = False,
max_failures: Optional[int] = None,
) -> ImagesDirectoryProcessingDetails:
if api_key is None:
api_key = API_KEY
Expand All @@ -125,6 +126,7 @@ def process_image_directory_with_workflow_using_inference_package(
save_image_outputs=save_image_outputs,
log_file=log_file,
debug_mode=debug_mode,
max_failures=max_failures,
)
finally:
log_file.close()
Expand Down Expand Up @@ -163,6 +165,7 @@ def _process_images_within_directory(
save_image_outputs: bool,
log_file: TextIO,
debug_mode: bool = False,
max_failures: Optional[int] = None,
) -> List[Tuple[str, str]]:
workflow_specification = _get_workflow_specification(
workflow_specification=workflow_specification,
Expand All @@ -176,6 +179,8 @@ def _process_images_within_directory(
description="Processing images...",
total=len(files_to_process),
)
if max_failures is None:
max_failures = len(files_to_process) + 1
failed_files = []
on_success = partial(
_on_success,
Expand Down Expand Up @@ -204,9 +209,28 @@ def _process_images_within_directory(
on_failure=on_failure,
debug_mode=debug_mode,
)
failures = 0
succeeded_files = set()
with progress_bar:
for image_path in files_to_process:
processing_fun(image_path)
success = processing_fun(image_path)
if not success:
failures += 1
else:
succeeded_files.add(image_path)
if failures >= max_failures:
break
failed_files_lookup = {f[0] for f in failed_files}
aborted_files = [
f
for f in files_to_process
if f not in succeeded_files and f not in failed_files_lookup
]
for file in aborted_files:
on_failure(
file,
"Aborted processing due to exceeding max failures of Workflows executions.",
)
return failed_files


Expand Down Expand Up @@ -247,7 +271,7 @@ def _process_single_image_from_directory(
on_failure: Callable[[ImagePath, str], None],
log_file_lock: Optional[Lock] = None,
debug_mode: bool = False,
) -> None:
) -> bool:
try:
result = _run_workflow_for_single_image_with_inference(
model_manager=model_manager,
Expand All @@ -268,11 +292,13 @@ def _process_single_image_from_directory(
log_file=log_file, image_path=image_path, lock=log_file_lock
)
on_success(image_path, index_entry)
return True
except Exception as error:
error_summary = f"Error in processing {image_path}. Error type: {error.__class__.__name__} - {error}"
if debug_mode:
CLI_LOGGER.exception(error_summary)
on_failure(image_path, error_summary)
return False


def _get_workflow_specification(
Expand Down
33 changes: 28 additions & 5 deletions inference_cli/lib/workflows/remote_image_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def process_image_directory_with_workflow_using_api(
aggregation_format: OutputFileType = OutputFileType.JSONL,
debug_mode: bool = False,
processing_threads: Optional[int] = None,
max_failures: Optional[int] = None,
) -> ImagesDirectoryProcessingDetails:
if api_key is None:
api_key = _get_api_key_from_env()
Expand Down Expand Up @@ -135,6 +136,7 @@ def process_image_directory_with_workflow_using_api(
api_url=api_url,
processing_threads=processing_threads,
debug_mode=debug_mode,
max_failures=max_failures,
)
finally:
log_file.close()
Expand Down Expand Up @@ -175,12 +177,15 @@ def _process_images_within_directory_with_api(
api_url: str,
processing_threads: int,
debug_mode: bool = False,
max_failures: Optional[int] = None,
) -> List[Tuple[str, str]]:
progress_bar = Progress()
processing_task = progress_bar.add_task(
description="Processing images...",
total=len(files_to_process),
)
if max_failures is None:
max_failures = len(files_to_process) + 1
failed_files = []
on_success = partial(
_on_success,
Expand Down Expand Up @@ -212,12 +217,28 @@ def _process_images_within_directory_with_api(
log_file_lock=log_file_lock,
debug_mode=debug_mode,
)
failures = 0
succeeded_files = set()
with progress_bar:
with ThreadPool(processes=processing_threads) as pool:
_ = pool.map(
processing_fun,
files_to_process,
)
for file, is_success in pool.imap(processing_fun, files_to_process):
if not is_success:
failures += 1
else:
succeeded_files.add(file)
if failures >= max_failures:
break
failed_files_lookup = {f[0] for f in failed_files}
aborted_files = [
f
for f in files_to_process
if f not in succeeded_files and f not in failed_files_lookup
]
for file in aborted_files:
on_failure(
file,
"Aborted processing due to exceeding max failures of Workflows executions.",
)
return failed_files


Expand Down Expand Up @@ -259,7 +280,7 @@ def _process_single_image_from_directory(
on_failure: Callable[[ImagePath, str], None],
log_file_lock: Optional[Lock] = None,
debug_mode: bool = False,
) -> None:
) -> Tuple[str, bool]:
try:
result = _run_workflow_for_single_image_through_api(
image_path=image_path,
Expand All @@ -281,11 +302,13 @@ def _process_single_image_from_directory(
log_file=log_file, image_path=image_path, lock=log_file_lock
)
on_success(image_path, index_entry)
return image_path, True
except Exception as error:
error_summary = f"Error in processing {image_path}. Error type: {error.__class__.__name__} - {error}"
if debug_mode:
CLI_LOGGER.exception(error_summary)
on_failure(image_path, error_summary)
return image_path, False


@backoff.on_exception(
Expand Down
9 changes: 9 additions & 0 deletions inference_cli/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ def process_images_directory(
help="Flag enabling errors stack traces to be displayed (helpful for debugging)",
),
] = False,
max_failures: Annotated[
Optional[int],
typer.Option(
"--max-failures",
help="Maximum number of Workflow executions for directory images which will be tolerated before give up. "
"If not set - unlimited.",
),
] = None,
) -> None:
try:
ensure_target_directory_is_empty(
Expand Down Expand Up @@ -487,6 +495,7 @@ def process_images_directory(
aggregation_format=aggregation_format,
processing_threads=processing_threads,
debug_mode=debug_mode,
max_failures=max_failures,
)
except KeyboardInterrupt:
print("Command interrupted - results may not be fully consistent.")
Expand Down

0 comments on commit 01857b6

Please sign in to comment.