From 9d38ee4ab20d840dc17bfc01d0b1a3558785fc83 Mon Sep 17 00:00:00 2001 From: Xu Han Date: Wed, 6 Nov 2024 09:15:26 +0000 Subject: [PATCH 1/9] chore: remove unused code --- source/lambda/etl/create_chatbot.py | 103 ---------------------------- source/lambda/etl/get_status.py | 47 ------------- source/lambda/etl/list_chatbot.py | 55 --------------- 3 files changed, 205 deletions(-) delete mode 100644 source/lambda/etl/create_chatbot.py delete mode 100644 source/lambda/etl/get_status.py delete mode 100644 source/lambda/etl/list_chatbot.py diff --git a/source/lambda/etl/create_chatbot.py b/source/lambda/etl/create_chatbot.py deleted file mode 100644 index 615138222..000000000 --- a/source/lambda/etl/create_chatbot.py +++ /dev/null @@ -1,103 +0,0 @@ -import json -import logging -import os -from datetime import datetime, timezone - -import boto3 -from constant import IndexType -from utils.ddb_utils import ( - initiate_chatbot, - initiate_index, - initiate_model, - is_chatbot_existed, -) - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -region_name = os.environ.get("AWS_REGION") -embedding_endpoint = os.environ.get("EMBEDDING_ENDPOINT") -dynamodb = boto3.resource("dynamodb", region_name=region_name) -index_table = dynamodb.Table(os.environ.get("INDEX_TABLE_NAME")) -chatbot_table = dynamodb.Table(os.environ.get("CHATBOT_TABLE_NAME")) -model_table = dynamodb.Table(os.environ.get("MODEL_TABLE_NAME")) - - -def lambda_handler(event, context): - logger.info(f"event:{event}") - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - input_body = json.loads(event["body"]) - if "groupName" not in input_body: - return { - "statusCode": 400, - "headers": resp_header, - "body": json.dumps( - { - "message": "No groupName in the body, please specify a groupName, e.g. Admin" - } - ), - } - - group_name = input_body["groupName"] - chatbot_id = group_name.lower() - if is_chatbot_existed(chatbot_table, group_name, chatbot_id): - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps( - { - "chatbotId": chatbot_id, - "groupName": group_name, - "message": "Chatbot existed", - } - ), - } - - model_id = f"{chatbot_id}-embedding" - create_time = str(datetime.now(timezone.utc)) - initiate_model(model_table, group_name, model_id, - embedding_endpoint, create_time) - - index_id_list = {} - DESCRIPTION = "Answer question based on search result" - # Iterate over all enum members and create DDB metadata - for member in IndexType.__members__.values(): - index_type = member.value - index_id = tag = f"{chatbot_id}-{index_type}-default" - index_id_list[index_type] = index_id - initiate_index( - index_table, - group_name, - index_id, - model_id, - index_type, - tag, - create_time, - DESCRIPTION, - ) - initiate_chatbot( - chatbot_table, - group_name, - chatbot_id, - index_id, - index_type, - tag, - create_time, - ) - - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps( - { - "chatbotId": chatbot_id, - "groupName": group_name, - "indexIds": index_id_list, - "message": "Chatbot created", - } - ), - } diff --git a/source/lambda/etl/get_status.py b/source/lambda/etl/get_status.py deleted file mode 100644 index 8682a1e34..000000000 --- a/source/lambda/etl/get_status.py +++ /dev/null @@ -1,47 +0,0 @@ -import json -import logging -import os - -import boto3 - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -state_machine_arn = os.environ["SFN_ARN"] - - -def lambda_handler(event, context): - execution_id = event["queryStringParameters"]["executionId"] - sf_client = boto3.client("stepfunctions") - execution_arn = ( - state_machine_arn.replace( - "stateMachine", "execution") + ":" + execution_id - ) - - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - - try: - response = sf_client.describe_execution(executionArn=execution_arn) - - execution_status = response["status"] - logger.info("Execution Status: %s", execution_status) - - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps( - {"execution_id": execution_id, "execution_status": execution_status} - ), - } - except Exception as e: - logger.error("Error: %s", str(e)) - - return { - "statusCode": 500, - "headers": resp_header, - "body": json.dumps(f"Error: {str(e)}"), - } diff --git a/source/lambda/etl/list_chatbot.py b/source/lambda/etl/list_chatbot.py deleted file mode 100644 index b0a5708e0..000000000 --- a/source/lambda/etl/list_chatbot.py +++ /dev/null @@ -1,55 +0,0 @@ -import json -import logging -import os - -import boto3 - -cognito = boto3.client("cognito-idp") - -cognito_user_pool_id = os.environ.get("USER_POOL_ID") - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def lambda_handler(event, context): - - authorizer_type = ( - event["requestContext"].get("authorizer", {}).get("authorizerType") - ) - if authorizer_type == "lambda_authorizer": - claims = json.loads(event["requestContext"]["authorizer"]["claims"]) - cognito_groups = claims["cognito:groups"] - cognito_groups_list = cognito_groups.split(",") - else: - cognito_groups_list = ["Admin"] - - output = {} - - if "Admin" in cognito_groups_list: - # Return a list of all cognito groups - response = cognito.list_groups(UserPoolId=cognito_user_pool_id) - output["chatbot_ids"] = [group["GroupName"] for group in response["Groups"]] - else: - output["chatbot_ids"] = cognito_groups_list - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - - try: - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps(output), - } - except Exception as e: - logger.error("Error: %s", str(e)) - - return { - "statusCode": 500, - "headers": resp_header, - "body": json.dumps(f"Error: {str(e)}"), - } From 7e1ddcf69d8fd0f787b99a3be2a0e33a175548eb Mon Sep 17 00:00:00 2001 From: Xu Han Date: Wed, 6 Nov 2024 11:47:50 +0000 Subject: [PATCH 2/9] feat: merge execution logic into one lambda --- source/infrastructure/lib/api/api-stack.ts | 33 +-- source/lambda/etl/delete_execution.py | 139 ----------- source/lambda/etl/execution_management.py | 265 +++++++++++++++++++++ source/lambda/etl/get_execution.py | 49 ---- source/lambda/etl/list_execution.py | 110 --------- 5 files changed, 273 insertions(+), 323 deletions(-) delete mode 100644 source/lambda/etl/delete_execution.py create mode 100644 source/lambda/etl/execution_management.py delete mode 100644 source/lambda/etl/get_execution.py delete mode 100644 source/lambda/etl/list_execution.py diff --git a/source/infrastructure/lib/api/api-stack.ts b/source/infrastructure/lib/api/api-stack.ts index 254f608b4..7d506606c 100644 --- a/source/infrastructure/lib/api/api-stack.ts +++ b/source/infrastructure/lib/api/api-stack.ts @@ -176,36 +176,19 @@ export class ApiConstruct extends Construct { ], }); - const listExecutionLambda = new LambdaFunction(this, "ListExecution", { + const executionManagementLambda = new LambdaFunction(this, "ExecutionManagementLambda", { code: Code.fromAsset(join(__dirname, "../../../lambda/etl")), - handler: "list_execution.lambda_handler", + handler: "execution_management.lambda_handler", environment: { EXECUTION_TABLE: executionTableName, - }, - statements: [this.iamHelper.dynamodbStatement], - }); - - const getExecutionLambda = new LambdaFunction(this, "GetExecution", { - code: Code.fromAsset(join(__dirname, "../../../lambda/etl")), - handler: "get_execution.lambda_handler", - environment: { ETL_OBJECT_TABLE: etlObjTableName, ETL_OBJECT_INDEX: etlObjIndexName, - }, - statements: [this.iamHelper.dynamodbStatement], - }); - - const delExecutionLambda = new LambdaFunction(this, "DeleteExecution", { - code: Code.fromAsset(join(__dirname, "../../../lambda/etl")), - handler: "delete_execution.lambda_handler", - environment: { SFN_ARN: props.knowledgeBaseStackOutputs.sfnOutput.stateMachineArn, - EXECUTION_TABLE: executionTableName, }, statements: [this.iamHelper.dynamodbStatement], }); - props.knowledgeBaseStackOutputs.sfnOutput.grantStartExecution(delExecutionLambda.function); + props.knowledgeBaseStackOutputs.sfnOutput.grantStartExecution(executionManagementLambda.function); const uploadDocLambda = new LambdaFunction(this, "UploadDocument", { code: Code.fromAsset(join(__dirname, "../../../lambda/etl")), @@ -278,7 +261,7 @@ export class ApiConstruct extends Construct { } apiKBExecution.addMethod( "GET", - new apigw.LambdaIntegration(listExecutionLambda.function), + new apigw.LambdaIntegration(executionManagementLambda.function), {...this.genMethodOption(api, auth, { Items: {type: JsonSchemaType.ARRAY, items: { type: JsonSchemaType.OBJECT, @@ -333,11 +316,11 @@ export class ApiConstruct extends Construct { ); apiKBExecution.addMethod( "DELETE", - new apigw.LambdaIntegration(delExecutionLambda.function), + new apigw.LambdaIntegration(executionManagementLambda.function), { ...this.genMethodOption(api, auth, { - data: { type: JsonSchemaType.ARRAY, items: { type: JsonSchemaType.STRING } }, - message: { type: JsonSchemaType.STRING } + ExecutionIds: { type: JsonSchemaType.ARRAY, items: { type: JsonSchemaType.STRING } }, + Message: { type: JsonSchemaType.STRING } }), requestModels: this.genRequestModel(api, { "executionId": { "type": JsonSchemaType.ARRAY, "items": { "type": JsonSchemaType.STRING } }, @@ -348,7 +331,7 @@ export class ApiConstruct extends Construct { const apiGetExecutionById = apiKBExecution.addResource("{executionId}"); apiGetExecutionById.addMethod( "GET", - new apigw.LambdaIntegration(getExecutionLambda.function), + new apigw.LambdaIntegration(executionManagementLambda.function), { ...this.genMethodOption(api, auth, { Items: { diff --git a/source/lambda/etl/delete_execution.py b/source/lambda/etl/delete_execution.py deleted file mode 100644 index 8ead1bde4..000000000 --- a/source/lambda/etl/delete_execution.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Lambda function for deleting execution pipelines and associated documents. - -This module handles the deletion of execution pipelines and their corresponding -documents from OpenSearch. It interacts with DynamoDB and Step Functions to -manage the deletion process. -""" - -import json -import logging -import os - -import boto3 -from constant import ExecutionStatus, OperationType, UiStatus - -# Set up logging -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -# Initialize AWS clients -sfn_client = boto3.client("stepfunctions") -dynamodb = boto3.resource("dynamodb") - -# Get environment variables -sfn_arn = os.environ.get("SFN_ARN") -table_name = os.environ.get("EXECUTION_TABLE") -table = dynamodb.Table(table_name) - - -def get_execution_item(execution_id): - """ - Retrieve an execution item from DynamoDB. - - Args: - execution_id (str): The ID of the execution to retrieve. - - Returns: - dict: The execution item if found, None otherwise. - """ - execution_item = table.get_item(Key={"executionId": execution_id}) - if "Item" not in execution_item: - return None - return execution_item["Item"] - - -def update_execution_item(execution_id, execution_status, ui_status): - """ - Update the status of an execution item in DynamoDB. - - Args: - execution_id (str): The ID of the execution to update. - execution_status (str): The new execution status. - ui_status (str): The new UI status. - - Returns: - dict: The response from the DynamoDB update operation. - """ - response = table.update_item( - Key={"executionId": execution_id}, - UpdateExpression="SET executionStatus = :execution_status, uiStatus = :ui_status", - ExpressionAttributeValues={":execution_status": execution_status, ":ui_status": ui_status}, - ReturnValues="UPDATED_NEW", - ) - return response - - -def delete_execution_pipeline(execution_id): - """ - Delete an execution pipeline and its associated document. - - Args: - execution_id (str): The ID of the execution to delete. - - Raises: - Exception: If the execution is not found. - """ - execution_item = get_execution_item(execution_id) - if not execution_item: - raise Exception(f"Execution {execution_id} not found") - - # Update execution item status - update_execution_item(execution_id, ExecutionStatus.DELETING.value, UiStatus.ACTIVE.value) - - # Prepare input for Step Function to delete document from OpenSearch - delete_document_sfn_input = { - "s3Bucket": execution_item["s3Bucket"], - "s3Prefix": execution_item["s3Prefix"], - "chatbotId": execution_item["chatbotId"], - "indexType": execution_item["indexType"], - "operationType": OperationType.DELETE.value, - "indexId": execution_item["indexId"], - "groupName": execution_item["groupName"], - "tableItemId": execution_item["executionId"], - "embeddingModelType": execution_item["embeddingModelType"], - "offline": "true", - } - sfn_client.start_execution(stateMachineArn=sfn_arn, input=json.dumps(delete_document_sfn_input)) - - -def lambda_handler(event, context): - """ - AWS Lambda function handler for deleting execution pipelines. - - Args: - event (dict): The event data passed to the Lambda function. - context (object): The runtime information of the Lambda function. - - Returns: - dict: A response object containing the status code, headers, and body. - """ - logger.info(event) - input_body = json.loads(event["body"]) - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - - try: - # Delete each execution pipeline specified in the input - for execution_id in input_body["executionId"]: - delete_execution_pipeline(execution_id) - - # Prepare success response - output = {"message": "The deletion of specified documents has started", "data": input_body["executionId"]} - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps(output), - } - except Exception as e: - # Log and return error response - logger.error("Error: %s", str(e)) - return { - "statusCode": 500, - "headers": resp_header, - "body": json.dumps(f"Error: {str(e)}"), - } diff --git a/source/lambda/etl/execution_management.py b/source/lambda/etl/execution_management.py new file mode 100644 index 000000000..71def2e85 --- /dev/null +++ b/source/lambda/etl/execution_management.py @@ -0,0 +1,265 @@ +""" +Lambda function for managing execution pipelines and associated documents. +Provides REST API endpoints for CRUD operations on execution pipelines, +handling document management in DynamoDB and OpenSearch. +""" + +import json +import logging +import os +from dataclasses import dataclass +from typing import Any, Dict, Iterator, List, Optional + +import boto3 +from boto3.dynamodb.conditions import Key +from botocore.paginate import TokenEncoder +from botocore.paginator import Paginator +from constant import ExecutionStatus, OperationType, UiStatus + +# Configure logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +@dataclass +class AwsResources: + """Centralized AWS resource management""" + + sfn_client = boto3.client("stepfunctions") + dynamodb = boto3.resource("dynamodb") + dynamodb_client = boto3.client("dynamodb") + + def __post_init__(self): + # Initialize DynamoDB tables + self.execution_table = self.dynamodb.Table(Config.EXECUTION_TABLE_NAME) + self.object_table = self.dynamodb.Table(Config.ETL_OBJECT_TABLE_NAME) + + +class Config: + """Configuration constants""" + + SFN_ARN = os.environ["SFN_ARN"] + EXECUTION_TABLE_NAME = os.environ["EXECUTION_TABLE"] + ETL_OBJECT_TABLE_NAME = os.environ["ETL_OBJECT_TABLE"] + ETL_OBJECT_INDEX = os.environ["ETL_OBJECT_INDEX"] + DEFAULT_PAGE_SIZE = 50 + DEFAULT_MAX_ITEMS = 50 + + CORS_HEADERS = { + "Content-Type": "application/json", + "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "*", + } + + +# Initialize AWS resources +aws_resources = AwsResources() +token_encoder = TokenEncoder() + + +class PaginationConfig: + + @staticmethod + def get_query_parameter(event: Dict[str, Any], parameter_name: str, default_value: Any = None) -> Any: + """Extract query parameter from event with default value""" + if event.get("queryStringParameters") and parameter_name in event["queryStringParameters"]: + return event["queryStringParameters"][parameter_name] + return default_value + + @classmethod + def get_pagination_config(cls, event: Dict[str, Any]) -> Dict[str, Any]: + """Build pagination configuration from event parameters""" + return { + "MaxItems": int(cls.get_query_parameter(event, "max_items", Config.DEFAULT_MAX_ITEMS)), + "PageSize": int(cls.get_query_parameter(event, "page_size", Config.DEFAULT_PAGE_SIZE)), + "StartingToken": cls.get_query_parameter(event, "starting_token"), + } + + +class AuthorizationHelper: + @staticmethod + def get_cognito_groups(event: Dict[str, Any]) -> List[str]: + """Extract and validate Cognito groups from event authorizer""" + authorizer = event["requestContext"].get("authorizer", {}) + authorizer_type = authorizer.get("authorizerType") + + if authorizer_type != "lambda_authorizer": + logger.error("Invalid authorizer type") + raise ValueError("Invalid authorizer type") + + claims = json.loads(authorizer["claims"]) + + if "use_api_key" in claims: + return [claims.get("GroupName", "Admin")] + + return claims["cognito:groups"].split(",") + + +class ExecutionManager: + """Handles execution-related database operations""" + + @staticmethod + def get_execution(execution_id: str) -> Optional[Dict]: + """Retrieve execution details from DynamoDB""" + response = aws_resources.execution_table.get_item(Key={"executionId": execution_id}) + return response.get("Item") + + @staticmethod + def update_execution_status(execution_id: str, execution_status: str, ui_status: str) -> Dict: + """Update execution status in DynamoDB""" + return aws_resources.execution_table.update_item( + Key={"executionId": execution_id}, + UpdateExpression="SET executionStatus = :execution_status, uiStatus = :ui_status", + ExpressionAttributeValues={":execution_status": execution_status, ":ui_status": ui_status}, + ReturnValues="UPDATED_NEW", + ) + + @staticmethod + def delete_execution(execution_id: str) -> None: + """Initiate execution deletion process""" + execution = ExecutionManager.get_execution(execution_id) + if not execution: + raise ValueError(f"Execution {execution_id} not found") + + # Update status to indicate deletion in progress + ExecutionManager.update_execution_status(execution_id, ExecutionStatus.DELETING.value, UiStatus.ACTIVE.value) + + # Prepare deletion input for Step Function + deletion_input = { + "s3Bucket": execution["s3Bucket"], + "s3Prefix": execution["s3Prefix"], + "chatbotId": execution["chatbotId"], + "indexType": execution["indexType"], + "operationType": OperationType.DELETE.value, + "indexId": execution["indexId"], + "groupName": execution["groupName"], + "tableItemId": execution["executionId"], + "embeddingModelType": execution["embeddingModelType"], + "offline": "true", + } + + aws_resources.sfn_client.start_execution(stateMachineArn=Config.SFN_ARN, input=json.dumps(deletion_input)) + + @staticmethod + def get_filtered_executions( + paginator: Paginator, cognito_groups: List[str], pagination_config: Dict[str, Any] + ) -> Dict[str, Any]: + """Get filtered executions based on user groups""" + if "Admin" in cognito_groups: + response_iterator = paginator.paginate( + TableName=Config.EXECUTION_TABLE_NAME, + PaginationConfig=pagination_config, + FilterExpression="uiStatus = :active", + ExpressionAttributeValues={":active": {"S": "ACTIVE"}}, + ) + else: + response_iterator = paginator.paginate( + TableName=Config.EXECUTION_TABLE_NAME, + PaginationConfig=pagination_config, + FilterExpression="uiStatus = :active AND groupName = :group_id", + ExpressionAttributeValues={ + ":active": {"S": "ACTIVE"}, + ":group_id": {"S": cognito_groups[0]}, + }, + ) + + output = {} + encoder = TokenEncoder() + + for page in response_iterator: + page_items = page["Items"] + processed_items = [] + + for item in page_items: + processed_item = {key: value["S"] for key, value in item.items()} + processed_items.append(processed_item) + + output["Items"] = processed_items + output["Count"] = page["Count"] + output["Config"] = pagination_config + + if "LastEvaluatedKey" in page: + output["LastEvaluatedKey"] = encoder.encode({"ExclusiveStartKey": page["LastEvaluatedKey"]}) + + return output + + +class ApiResponse: + """Standardized API response handler""" + + @staticmethod + def success(data: Any, status_code: int = 200) -> Dict: + return {"statusCode": status_code, "headers": Config.CORS_HEADERS, "body": json.dumps(data)} + + @staticmethod + def error(message: str, status_code: int = 500) -> Dict: + logger.error("Error: %s", message) + return {"statusCode": status_code, "headers": Config.CORS_HEADERS, "body": json.dumps({"error": str(message)})} + + +class ApiHandler: + """API endpoint handlers""" + + def __init__(self): + self.execution_service = ExecutionService(aws_resources.dynamodb_client) + + @staticmethod + def delete_executions(event: Dict) -> Dict: + """Handle DELETE /executions endpoint""" + try: + execution_ids = json.loads(event["body"])["executionId"] + for execution_id in execution_ids: + ExecutionManager.delete_execution(execution_id) + + return ApiResponse.success({"Message": "Deletion process initiated", "ExecutionIds": execution_ids}) + except Exception as e: + return ApiResponse.error(str(e)) + + @staticmethod + def get_execution_objects(event: Dict) -> Dict: + """Handle GET /executions/{executionId}/objects endpoint""" + try: + execution_id = event["pathParameters"]["executionId"] + response = aws_resources.object_table.query( + IndexName=Config.ETL_OBJECT_INDEX, KeyConditionExpression=Key("executionId").eq(execution_id) + ) + + return ApiResponse.success({"Items": response["Items"], "Count": response["Count"]}) + except Exception as e: + return ApiResponse.error(str(e)) + + @staticmethod + def list_executions(event: Dict) -> Dict: + """Handle GET /executions endpoint""" + try: + # Get cognito groups and pagination config and paginator + cognito_groups = AuthorizationHelper.get_cognito_groups(event) + pagination_config = PaginationConfig.get_pagination_config(event) + paginator = aws_resources.dynamodb_client.get_paginator("scan") + + # Get and process executions + result = ExecutionManager.get_filtered_executions(paginator, cognito_groups, pagination_config) + + return ApiResponse.success(result) + except ValueError as ve: + return ApiResponse.error(str(ve), 403) + except Exception as e: + return ApiResponse.error(str(e)) + + +def lambda_handler(event: Dict, context: Any) -> Dict: + """Routes API requests to appropriate handlers based on HTTP method and path""" + logger.info("Received event: %s", json.dumps(event)) + + routes = { + ("DELETE", "/knowledge-base/executions"): ApiHandler.delete_executions, + ("GET", "/knowledge-base/executions/{executionId}"): ApiHandler.get_execution_objects, + ("GET", "/knowledge-base/executions"): ApiHandler.list_executions, + } + + handler = routes.get((event["httpMethod"], event["resource"])) + if not handler: + return ApiResponse.error("Route not found", 404) + + return handler(event) diff --git a/source/lambda/etl/get_execution.py b/source/lambda/etl/get_execution.py deleted file mode 100644 index bdf84289d..000000000 --- a/source/lambda/etl/get_execution.py +++ /dev/null @@ -1,49 +0,0 @@ -import json -import logging -import os - -import boto3 -from boto3.dynamodb.conditions import Key - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -dynamodb = boto3.resource('dynamodb') -table_name = os.environ.get('ETL_OBJECT_TABLE') -index_name = os.environ.get('ETL_OBJECT_INDEX') -object_table = dynamodb.Table(table_name) - - -def lambda_handler(event, context): - # API Gateway validates parameters - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - - try: - execution_id = event["pathParameters"]["executionId"] - response = object_table.query( - IndexName=index_name, - KeyConditionExpression=Key('executionId').eq(execution_id) - ) - logger.info(response) - output = { - "Items": response["Items"], - "Count": response["Count"] - } - - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps(output), - } - except Exception as e: - logger.error("Error: %s", str(e)) - - return { - "statusCode": 500, - "headers": resp_header, - "body": json.dumps(f"Error: {str(e)}"), - } diff --git a/source/lambda/etl/list_execution.py b/source/lambda/etl/list_execution.py deleted file mode 100644 index d1a082f32..000000000 --- a/source/lambda/etl/list_execution.py +++ /dev/null @@ -1,110 +0,0 @@ -import json -import logging -import os - -import boto3 -from botocore.paginate import TokenEncoder - -DEFAULT_MAX_ITEMS = 50 -DEFAULT_SIZE = 50 -logger = logging.getLogger() -logger.setLevel(logging.INFO) -client = boto3.client("dynamodb") -table_name = os.environ.get("EXECUTION_TABLE") -encoder = TokenEncoder() - - -def get_query_parameter(event, parameter_name, default_value=None): - if event.get("queryStringParameters") and parameter_name in event["queryStringParameters"]: - return event["queryStringParameters"][parameter_name] - return default_value - - -def lambda_handler(event, context): - logger.info(event) - resp_header = { - "Content-Type": "application/json", - "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "*", - } - - authorizer_type = event["requestContext"].get("authorizer", {}).get("authorizerType") - if authorizer_type == "lambda_authorizer": - claims = json.loads(event["requestContext"]["authorizer"]["claims"]) - if "use_api_key" in claims: - group_name = get_query_parameter(event, "GroupName", "Admin") - cognito_groups_list = [group_name] - else: - cognito_groups = claims["cognito:groups"] - cognito_groups_list = cognito_groups.split(",") - else: - logger.error("Invalid authorizer type") - return { - "statusCode": 403, - "headers": resp_header, - "body": json.dumps({"error": "Invalid authorizer type"}), - } - - max_items = get_query_parameter(event, "max_items", DEFAULT_MAX_ITEMS) - page_size = get_query_parameter(event, "page_size", DEFAULT_SIZE) - starting_token = get_query_parameter(event, "starting_token") - - config = { - "MaxItems": int(max_items), - "PageSize": int(page_size), - "StartingToken": starting_token, - } - - # Use query after adding a filter - paginator = client.get_paginator("scan") - - if "Admin" in cognito_groups_list: - response_iterator = paginator.paginate( - TableName=table_name, - PaginationConfig=config, - FilterExpression="uiStatus = :active", - ExpressionAttributeValues={":active": {"S": "ACTIVE"}}, - ) - else: - response_iterator = paginator.paginate( - TableName=table_name, - PaginationConfig=config, - FilterExpression="uiStatus = :active AND groupName = :group_id", - ExpressionAttributeValues={ - ":active": {"S": "ACTIVE"}, - ":group_id": {"S": cognito_groups_list[0]}, - }, - ) - - output = {} - for page in response_iterator: - page_items = page["Items"] - page_json = [] - for item in page_items: - item_json = {} - for key in item.keys(): - item_json[key] = item[key]["S"] - page_json.append(item_json) - # Return the latest page - output["Items"] = page_json - output["Count"] = page["Count"] - if "LastEvaluatedKey" in page: - output["LastEvaluatedKey"] = encoder.encode({"ExclusiveStartKey": page["LastEvaluatedKey"]}) - - output["Config"] = config - - try: - return { - "statusCode": 200, - "headers": resp_header, - "body": json.dumps(output), - } - except Exception as e: - logger.error("Error: %s", str(e)) - - return { - "statusCode": 500, - "headers": resp_header, - "body": json.dumps(f"Error: {str(e)}"), - } From 85748a75e99bfccf497c28f2fb8c9bc438d84a6f Mon Sep 17 00:00:00 2001 From: Xu Han Date: Wed, 6 Nov 2024 12:36:11 +0000 Subject: [PATCH 3/9] feat: add update for execution management --- source/lambda/etl/constant.py | 2 +- source/lambda/etl/execution_management.py | 90 +++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/source/lambda/etl/constant.py b/source/lambda/etl/constant.py index b97a7135b..480865d98 100644 --- a/source/lambda/etl/constant.py +++ b/source/lambda/etl/constant.py @@ -22,7 +22,7 @@ class ExecutionStatus(Enum): COMPLETED = "COMPLETED" DELETING = "DELETING" DELETED = "DELETED" - + UPDATING = "UPDATING" class EmbeddingModelType(Enum): BEDROCK_TITAN_V1 = "amazon.titan-embed-text-v1" diff --git a/source/lambda/etl/execution_management.py b/source/lambda/etl/execution_management.py index 71def2e85..6ea78e103 100644 --- a/source/lambda/etl/execution_management.py +++ b/source/lambda/etl/execution_management.py @@ -184,6 +184,78 @@ def get_filtered_executions( return output + @staticmethod + def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefix: str) -> Dict[str, Any]: + """Update execution details in DynamoDB + + Args: + execution_id: The ID of the execution to update + update_data: Dictionary containing fields to update + + Returns: + Updated execution item + + Raises: + ValueError: If execution not found or invalid update data + """ + # Verify execution exists + execution = ExecutionManager.get_execution(execution_id) + if not execution: + raise ValueError(f"Execution {execution_id} not found") + + ExecutionManager.update_execution_status(execution_id, ExecutionStatus.UPDATING.value, UiStatus.ACTIVE.value) + + existing_s3_bucket = execution["s3Bucket"] + existing_s3_prefix = execution["s3Prefix"] + + if existing_s3_bucket == updated_s3_bucket and existing_s3_prefix == updated_s3_prefix: + update_input = { + "s3Bucket": execution["s3Bucket"], + "s3Prefix": execution["s3Prefix"], + "chatbotId": execution["chatbotId"], + "indexType": execution["indexType"], + "operationType": OperationType.UPDATE.value, + "indexId": execution["indexId"], + "groupName": execution["groupName"], + "tableItemId": execution["executionId"], + "embeddingModelType": execution["embeddingModelType"], + "offline": "true", + } + else: + # Prepare deletion input for Step Function + deletion_input = { + "s3Bucket": execution["s3Bucket"], + "s3Prefix": execution["s3Prefix"], + "chatbotId": execution["chatbotId"], + "indexType": execution["indexType"], + "operationType": OperationType.DELETE.value, + "indexId": execution["indexId"], + "groupName": execution["groupName"], + "tableItemId": execution["executionId"], + "embeddingModelType": execution["embeddingModelType"], + "offline": "true", + } + + aws_resources.sfn_client.start_execution(stateMachineArn=Config.SFN_ARN, input=json.dumps(deletion_input)) + + # Create new execution for the updated S3 bucket and prefix + update_execution_input = { + "s3Bucket": updated_s3_bucket, + "s3Prefix": updated_s3_prefix, + "chatbotId": execution["chatbotId"], + "indexType": execution["indexType"], + "operationType": OperationType.CREATE.value, + "indexId": execution["indexId"], + "groupName": execution["groupName"], + "tableItemId": execution["executionId"], + "embeddingModelType": execution["embeddingModelType"], + "offline": "true", + } + + aws_resources.sfn_client.start_execution(stateMachineArn=Config.SFN_ARN, input=json.dumps(update_execution_input)) + + return response.get("Attributes", {}) + class ApiResponse: """Standardized API response handler""" @@ -247,6 +319,23 @@ def list_executions(event: Dict) -> Dict: except Exception as e: return ApiResponse.error(str(e)) + @staticmethod + def update_execution(event: Dict) -> Dict: + """Handle PUT /executions/{executionId} endpoint""" + try: + event_body = json.loads(event["body"]) + execution_id = event_body["executionId"] + updated_s3_bucket = event_body["s3Bucket"] + updated_s3_prefix = event_body["s3Prefix"] + + updated_execution = ExecutionManager.update_execution(execution_id, update_data) + return ApiResponse.success(updated_execution) + + except ValueError as ve: + return ApiResponse.error(str(ve), 400) + except Exception as e: + return ApiResponse.error(str(e)) + def lambda_handler(event: Dict, context: Any) -> Dict: """Routes API requests to appropriate handlers based on HTTP method and path""" @@ -256,6 +345,7 @@ def lambda_handler(event: Dict, context: Any) -> Dict: ("DELETE", "/knowledge-base/executions"): ApiHandler.delete_executions, ("GET", "/knowledge-base/executions/{executionId}"): ApiHandler.get_execution_objects, ("GET", "/knowledge-base/executions"): ApiHandler.list_executions, + ("PUT", "/knowledge-base/executions/{executionId}"): ApiHandler.update_execution, } handler = routes.get((event["httpMethod"], event["resource"])) From afd7c1bd576381d3f51cb12419e80afdfd09ee8f Mon Sep 17 00:00:00 2001 From: Xu Han Date: Wed, 6 Nov 2024 12:54:59 +0000 Subject: [PATCH 4/9] fix: fix execution notification issue --- source/lambda/etl/execution_management.py | 53 ++++++++++++----------- source/lambda/etl/notification.py | 11 ++++- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/source/lambda/etl/execution_management.py b/source/lambda/etl/execution_management.py index 6ea78e103..fb1fc65f6 100644 --- a/source/lambda/etl/execution_management.py +++ b/source/lambda/etl/execution_management.py @@ -8,12 +8,11 @@ import logging import os from dataclasses import dataclass -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, List, Optional import boto3 from boto3.dynamodb.conditions import Key from botocore.paginate import TokenEncoder -from botocore.paginator import Paginator from constant import ExecutionStatus, OperationType, UiStatus # Configure logging @@ -143,7 +142,7 @@ def delete_execution(execution_id: str) -> None: @staticmethod def get_filtered_executions( - paginator: Paginator, cognito_groups: List[str], pagination_config: Dict[str, Any] + paginator, cognito_groups: List[str], pagination_config: Dict[str, Any] ) -> Dict[str, Any]: """Get filtered executions based on user groups""" if "Admin" in cognito_groups: @@ -155,7 +154,7 @@ def get_filtered_executions( ) else: response_iterator = paginator.paginate( - TableName=Config.EXECUTION_TABLE_NAME, + TableName=Config.EXECUTION_TABLE_NAME, PaginationConfig=pagination_config, FilterExpression="uiStatus = :active AND groupName = :group_id", ExpressionAttributeValues={ @@ -185,16 +184,17 @@ def get_filtered_executions( return output @staticmethod - def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefix: str) -> Dict[str, Any]: + def update_execution(execution_id: str, update_s3_bucket: str, update_s3_prefix: str) -> Dict[str, Any]: """Update execution details in DynamoDB - + Args: execution_id: The ID of the execution to update - update_data: Dictionary containing fields to update - + update_s3_bucket: The new S3 bucket + update_s3_prefix: The new S3 prefix + Returns: Updated execution item - + Raises: ValueError: If execution not found or invalid update data """ @@ -202,14 +202,14 @@ def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefi execution = ExecutionManager.get_execution(execution_id) if not execution: raise ValueError(f"Execution {execution_id} not found") - + ExecutionManager.update_execution_status(execution_id, ExecutionStatus.UPDATING.value, UiStatus.ACTIVE.value) existing_s3_bucket = execution["s3Bucket"] existing_s3_prefix = execution["s3Prefix"] - if existing_s3_bucket == updated_s3_bucket and existing_s3_prefix == updated_s3_prefix: - update_input = { + if existing_s3_bucket == update_s3_bucket and existing_s3_prefix == update_s3_prefix: + update_execution_input = { "s3Bucket": execution["s3Bucket"], "s3Prefix": execution["s3Prefix"], "chatbotId": execution["chatbotId"], @@ -221,6 +221,10 @@ def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefi "embeddingModelType": execution["embeddingModelType"], "offline": "true", } + + aws_resources.sfn_client.start_execution( + stateMachineArn=Config.SFN_ARN, input=json.dumps(update_execution_input) + ) else: # Prepare deletion input for Step Function deletion_input = { @@ -240,8 +244,8 @@ def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefi # Create new execution for the updated S3 bucket and prefix update_execution_input = { - "s3Bucket": updated_s3_bucket, - "s3Prefix": updated_s3_prefix, + "s3Bucket": update_s3_bucket, + "s3Prefix": update_s3_prefix, "chatbotId": execution["chatbotId"], "indexType": execution["indexType"], "operationType": OperationType.CREATE.value, @@ -252,9 +256,11 @@ def update_execution(execution_id: str, updated_s3_bucket: str, updated_s3_prefi "offline": "true", } - aws_resources.sfn_client.start_execution(stateMachineArn=Config.SFN_ARN, input=json.dumps(update_execution_input)) - - return response.get("Attributes", {}) + aws_resources.sfn_client.start_execution( + stateMachineArn=Config.SFN_ARN, input=json.dumps(update_execution_input) + ) + + return {"Message": "Update process initiated"} class ApiResponse: @@ -273,9 +279,6 @@ def error(message: str, status_code: int = 500) -> Dict: class ApiHandler: """API endpoint handlers""" - def __init__(self): - self.execution_service = ExecutionService(aws_resources.dynamodb_client) - @staticmethod def delete_executions(event: Dict) -> Dict: """Handle DELETE /executions endpoint""" @@ -325,12 +328,12 @@ def update_execution(event: Dict) -> Dict: try: event_body = json.loads(event["body"]) execution_id = event_body["executionId"] - updated_s3_bucket = event_body["s3Bucket"] - updated_s3_prefix = event_body["s3Prefix"] - - updated_execution = ExecutionManager.update_execution(execution_id, update_data) + update_s3_bucket = event_body["s3Bucket"] + update_s3_prefix = event_body["s3Prefix"] + + updated_execution = ExecutionManager.update_execution(execution_id, update_s3_bucket, update_s3_prefix) return ApiResponse.success(updated_execution) - + except ValueError as ve: return ApiResponse.error(str(ve), 400) except Exception as e: diff --git a/source/lambda/etl/notification.py b/source/lambda/etl/notification.py index f92066c2f..b60df7025 100644 --- a/source/lambda/etl/notification.py +++ b/source/lambda/etl/notification.py @@ -11,6 +11,11 @@ execution_table = dynamodb.Table(os.environ.get("EXECUTION_TABLE")) +def get_execution_item(execution_id): + response = execution_table.get_item(Key={"executionId": execution_id}) + return response.get("Item", {}) + + def update_execution_item(execution_id, execution_status, ui_status): """ Update the status of an execution item in DynamoDB. @@ -39,9 +44,13 @@ def lambda_handler(event, context): message = json.loads(event["Records"][0]["Sns"]["Message"]) execution_id = message["executionId"] + + current_execution = get_execution_item(execution_id) + current_execution_status = current_execution["executionStatus"] operation_type = message["operationType"] if operation_type == OperationType.DELETE.value: - update_execution_item(execution_id, ExecutionStatus.DELETED.value, UiStatus.INACTIVE.value) + if current_execution_status == ExecutionStatus.DELETING.value: + update_execution_item(execution_id, ExecutionStatus.DELETING.value, UiStatus.INACTIVE.value) else: update_execution_item(execution_id, ExecutionStatus.COMPLETED.value, UiStatus.ACTIVE.value) From faf5d64d40ba9dc750fc4c9557584b918b33a9d9 Mon Sep 17 00:00:00 2001 From: Xu Han Date: Wed, 6 Nov 2024 13:21:13 +0000 Subject: [PATCH 5/9] feat: support edit and delete on front end --- source/portal/src/pages/library/Library.tsx | 33 ++++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/source/portal/src/pages/library/Library.tsx b/source/portal/src/pages/library/Library.tsx index 3e5762b75..4bce6fb6e 100644 --- a/source/portal/src/pages/library/Library.tsx +++ b/source/portal/src/pages/library/Library.tsx @@ -3,6 +3,7 @@ import CommonLayout from 'src/layout/CommonLayout'; import { Box, Button, + ButtonDropdown, CollectionPreferences, ContentLayout, Header, @@ -27,7 +28,7 @@ const parseDate = (item: LibraryListItem) => { const Library: React.FC = () => { const [selectedItems, setSelectedItems] = useState([]); const fetchData = useAxiosRequest(); - const [visible, setVisible] = useState(false); + const [showDelete, setShowDelete] = useState(false); const { t } = useTranslation(); const [loadingData, setLoadingData] = useState(false); const [allLibraryList, setAllLibraryList] = useState([]); @@ -81,7 +82,7 @@ const Library: React.FC = () => { method: 'delete', data: { executionId: selectedItems.map((item) => item.executionId) }, }); - setVisible(false); + setShowDelete(false); getLibraryList(); alertMsg(data.message, 'success'); setLoadingDelete(false); @@ -284,14 +285,24 @@ const Library: React.FC = () => { getLibraryList(); }} /> - + {t('button.action')} +