diff --git a/inference/core/workflows/core_steps/common/query_language/entities/enums.py b/inference/core/workflows/core_steps/common/query_language/entities/enums.py index 87e0ee1d23..9fcd6b4900 100644 --- a/inference/core/workflows/core_steps/common/query_language/entities/enums.py +++ b/inference/core/workflows/core_steps/common/query_language/entities/enums.py @@ -68,3 +68,5 @@ class DetectionsSelectionMode(Enum): LEFT_MOST = "left_most" RIGHT_MOST = "right_most" TOP_CONFIDENCE = "top_confidence" + FIRST = "first" + LAST = "last" diff --git a/inference/core/workflows/core_steps/common/query_language/operations/detections/base.py b/inference/core/workflows/core_steps/common/query_language/operations/detections/base.py index 7adf9ce6f5..4d8f093ce9 100644 --- a/inference/core/workflows/core_steps/common/query_language/operations/detections/base.py +++ b/inference/core/workflows/core_steps/common/query_language/operations/detections/base.py @@ -131,7 +131,21 @@ def select_rightmost_detection(detections: sv.Detections) -> sv.Detections: return detections[index] +def select_first_detection(detections: sv.Detections) -> sv.Detections: + if len(detections) == 0: + return deepcopy(detections) + return detections[0] + + +def select_last_detection(detections: sv.Detections) -> sv.Detections: + if len(detections) == 0: + return deepcopy(detections) + return detections[-1] + + DETECTIONS_SELECTORS = { + DetectionsSelectionMode.FIRST: select_first_detection, + DetectionsSelectionMode.LAST: select_last_detection, DetectionsSelectionMode.LEFT_MOST: select_leftmost_detection, DetectionsSelectionMode.RIGHT_MOST: select_rightmost_detection, DetectionsSelectionMode.TOP_CONFIDENCE: select_top_confidence_detection, diff --git a/inference/core/workflows/core_steps/sinks/roboflow/model_monitoring_inference_aggregator/v1.py b/inference/core/workflows/core_steps/sinks/roboflow/model_monitoring_inference_aggregator/v1.py index d82b748352..bd8badf4b0 100644 --- a/inference/core/workflows/core_steps/sinks/roboflow/model_monitoring_inference_aggregator/v1.py +++ b/inference/core/workflows/core_steps/sinks/roboflow/model_monitoring_inference_aggregator/v1.py @@ -214,7 +214,6 @@ def __init__( api_key: Optional[str], background_tasks: Optional[BackgroundTasks], thread_pool_executor: Optional[ThreadPoolExecutor], - model_id: str, ): if api_key is None: raise ValueError( @@ -228,7 +227,6 @@ def __init__( self._background_tasks = background_tasks self._thread_pool_executor = thread_pool_executor self._predictions_aggregator = PredictionsAggregator() - self._model_id = model_id @classmethod def get_init_parameters(cls) -> List[str]: @@ -244,10 +242,11 @@ def run( predictions: Union[sv.Detections, dict], frequency: int, unique_aggregator_key: str, + model_id: str, ) -> BlockResult: self._last_report_time_cache_key = f"workflows:steps_cache:roboflow_core/model_monitoring_inference_aggregator@v1:{unique_aggregator_key}:last_report_time" if predictions: - self._predictions_aggregator.collect(predictions, self._model_id) + self._predictions_aggregator.collect(predictions, model_id) if not self._is_in_reporting_range(frequency): return { "error_status": False, diff --git a/tests/workflows/integration_tests/execution/test_workflow_top_prediction.py b/tests/workflows/integration_tests/execution/test_workflow_top_prediction.py index 3eb00e4c43..d3465fa62f 100644 --- a/tests/workflows/integration_tests/execution/test_workflow_top_prediction.py +++ b/tests/workflows/integration_tests/execution/test_workflow_top_prediction.py @@ -166,3 +166,197 @@ def test_filtering_workflow_by_top_prediction_with_no_detections( assert len(all_detections) == 0, "Expected 0 total predictions" assert len(top_detections) == 0, "Expected top prediction to be an empty array" + + +SORT_DETECTIONS_WORKFLOW_LAST = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + {"type": "WorkflowParameter", "name": "model_id"}, + {"type": "WorkflowParameter", "name": "confidence", "default_value": 0.3}, + {"type": "WorkflowParameter", "name": "classes"}, + ], + "steps": [ + { + "type": "ObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "$inputs.model_id", + "confidence": "$inputs.confidence", + }, + { + "type": "roboflow_core/property_definition@v1", + "name": "property_definition", + "data": "$steps.model.predictions", + "operations": [ + {"type": "SortDetections", "mode": "confidence", "ascending": True}, + {"type": "DetectionsSelection", "mode": "last"}, + ], + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "all_predictions", + "selector": "$steps.model.predictions", + }, + { + "type": "JsonField", + "name": "selected_box", + "selector": "$steps.property_definition.output", + }, + ], +} + + +def test_extracting_largest_bbox_from_detections( + model_manager: ModelManager, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=SORT_DETECTIONS_WORKFLOW_LAST, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = execution_engine.run( + runtime_parameters={ + "image": crowd_image, + "model_id": "yolov8n-640", + "classes": {"person"}, + } + ) + + # then + assert isinstance(result, list), "Expected result to be list" + assert len(result) == 1, "Single image provided - single output expected" + all_detections: sv.Detections = result[0]["all_predictions"] + selected_box: sv.Detections = result[0]["selected_box"] + + assert len(all_detections) == 12, "Expected 12 total predictions" + assert np.allclose( + all_detections.xyxy, + EXPECTED_OBJECT_DETECTION_BBOXES, + atol=1, + ), "Expected bboxes to match what was validated manually as workflow outcome" + assert np.allclose( + all_detections.confidence, + EXPECTED_OBJECT_DETECTION_CONFIDENCES, + atol=0.01, + ), "Expected confidences to match what was validated manually as workflow outcome" + + assert len(selected_box) == 1, "Expected only one top prediction" + assert np.allclose( + selected_box.xyxy, + [EXPECTED_OBJECT_DETECTION_BBOXES[0]], + atol=1, + ), "Expected top bbox to match what was validated manually as workflow outcome" + assert np.allclose( + selected_box.confidence, + [EXPECTED_OBJECT_DETECTION_CONFIDENCES[0]], + atol=0.01, + ), "Expected top confidence to match what was validated manually as workflow outcome" + + +SORT_DETECTIONS_WORKFLOW_FIRST = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + {"type": "WorkflowParameter", "name": "model_id"}, + {"type": "WorkflowParameter", "name": "confidence", "default_value": 0.3}, + {"type": "WorkflowParameter", "name": "classes"}, + ], + "steps": [ + { + "type": "ObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "$inputs.model_id", + "confidence": "$inputs.confidence", + }, + { + "type": "roboflow_core/property_definition@v1", + "name": "property_definition", + "data": "$steps.model.predictions", + "operations": [ + {"type": "SortDetections", "mode": "confidence", "ascending": True}, + {"type": "DetectionsSelection", "mode": "first"}, + ], + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "all_predictions", + "selector": "$steps.model.predictions", + }, + { + "type": "JsonField", + "name": "first_box", + "selector": "$steps.property_definition.output", + }, + ], +} + + +def test_extracting_smallest_bbox_from_detections( + model_manager: ModelManager, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=SORT_DETECTIONS_WORKFLOW_FIRST, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = execution_engine.run( + runtime_parameters={ + "image": crowd_image, + "model_id": "yolov8n-640", + "classes": {"person"}, + } + ) + + # then + assert isinstance(result, list), "Expected result to be list" + assert len(result) == 1, "Single image provided - single output expected" + all_detections: sv.Detections = result[0]["all_predictions"] + first_box: sv.Detections = result[0]["first_box"] + + assert len(all_detections) == 12, "Expected 12 total predictions" + assert np.allclose( + all_detections.xyxy, + EXPECTED_OBJECT_DETECTION_BBOXES, + atol=1, + ), "Expected bboxes to match what was validated manually as workflow outcome" + assert np.allclose( + all_detections.confidence, + EXPECTED_OBJECT_DETECTION_CONFIDENCES, + atol=0.01, + ), "Expected confidences to match what was validated manually as workflow outcome" + + assert len(first_box) == 1, "Expected only one prediction" + assert np.allclose( + first_box.xyxy, + [EXPECTED_OBJECT_DETECTION_BBOXES[-1]], + atol=1, + ), "Expected top bbox to match what was validated manually as workflow outcome" + assert np.allclose( + first_box.confidence, + [EXPECTED_OBJECT_DETECTION_CONFIDENCES[-1]], + atol=0.01, + ), "Expected top confidence to match what was validated manually as workflow outcome" diff --git a/tests/workflows/unit_tests/core_steps/sinks/roboflow/test_model_monitoring_inference_aggregator.py b/tests/workflows/unit_tests/core_steps/sinks/roboflow/test_model_monitoring_inference_aggregator.py index 75f6aa865d..858641b74f 100644 --- a/tests/workflows/unit_tests/core_steps/sinks/roboflow/test_model_monitoring_inference_aggregator.py +++ b/tests/workflows/unit_tests/core_steps/sinks/roboflow/test_model_monitoring_inference_aggregator.py @@ -54,13 +54,13 @@ def test_run_not_in_reporting_range_success( api_key="my_api_key", background_tasks=None, thread_pool_executor=None, - model_id="my_model_id", ) result = block.run( fire_and_forget=True, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="my_model_id", ) # then @@ -121,13 +121,13 @@ def test_run_in_reporting_range_success_with_object_detection( api_key=api_key, background_tasks=None, thread_pool_executor=None, - model_id="construction-safety/10", ) result = block.run( fire_and_forget=False, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="construction-safety/10", ) # then @@ -217,13 +217,13 @@ def test_run_in_reporting_range_success_with_single_label_classification( api_key=api_key, background_tasks=None, thread_pool_executor=None, - model_id="pills-classification/1", ) result = block.run( fire_and_forget=False, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="pills-classification/1", ) # then @@ -313,13 +313,13 @@ def test_run_in_reporting_range_success_with_multi_label_classification( api_key=api_key, background_tasks=None, thread_pool_executor=None, - model_id="animals/32", ) result = block.run( fire_and_forget=False, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="animals/32", ) # then @@ -415,13 +415,13 @@ def test_send_inference_results_to_model_monitoring_failure( api_key=api_key, background_tasks=None, thread_pool_executor=None, - model_id="my_model_id", ) result = block.run( fire_and_forget=False, frequency=1, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="my_model_id", ) # then @@ -479,13 +479,13 @@ def test_run_when_not_in_reporting_range( api_key=api_key, background_tasks=None, thread_pool_executor=None, - model_id="my_model_id", ) result = block.run( fire_and_forget=False, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="my_model_id", ) # then @@ -545,13 +545,13 @@ def test_run_when_fire_and_forget_with_background_tasks( api_key=api_key, background_tasks=background_tasks, thread_pool_executor=None, - model_id="my_model_id", ) result = block.run( fire_and_forget=True, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="my_model_id", ) # then @@ -609,13 +609,13 @@ def test_run_when_fire_and_forget_with_thread_pool( api_key=api_key, background_tasks=None, thread_pool_executor=thread_pool_executor, - model_id="my_model_id", ) result = block.run( fire_and_forget=True, frequency=10, predictions=predictions, unique_aggregator_key=unique_aggregator_key, + model_id="my_model_id", ) # then