Skip to content

Commit

Permalink
Merge pull request #493 from aws-samples/lvn
Browse files Browse the repository at this point in the history
fix: fix two ETL issues
  • Loading branch information
NingLu authored Dec 24, 2024
2 parents a7b5a19 + cfed475 commit 89b2381
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 17 deletions.
4 changes: 3 additions & 1 deletion source/lambda/etl/sfn_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def handler(event, context):

input_body["indexId"] = index_id
input_body["groupName"] = group_name if "groupName" not in input_body else input_body["groupName"]
chatbot_event = {"body": json.dumps({"group_name": group_name})}
chatbot_event_body = input_body
chatbot_event_body["group_name"] = group_name
chatbot_event = {"body": json.dumps(chatbot_event_body)}
chatbot_result = create_chatbot(chatbot_event, group_name)

input_body["tableItemId"] = context.aws_request_id
Expand Down
Binary file modified source/lambda/job/dep/dist/llm_bot_dep-0.1.0-py3-none-any.whl
Binary file not shown.
2 changes: 1 addition & 1 deletion source/lambda/job/dep/llm_bot_dep/loaders/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def cb_process_object(s3, file_type: str, file_content, **kwargs):
elif file_type == "jsonl":
res = process_jsonl(s3, file_content, **kwargs)
elif file_type == "xlsx":
res = process_xlsx(s3, file_content, **kwargs)
res = process_xlsx(s3, **kwargs)
elif file_type == "image":
logger.info("process image")
res = process_image(s3, **kwargs)
Expand Down
25 changes: 11 additions & 14 deletions source/lambda/job/dep/llm_bot_dep/loaders/xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@
import pandas as pd
from langchain.docstore.document import Document


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def process_xlsx(s3, jsonl: bytes, **kwargs) -> List[Document]:
def process_xlsx(s3, **kwargs) -> List[Document]:
"""
Process the jsonl file include query and answer pairs or other k-v alike data, in format of:
{"question": "<question 1>", "answer": "<answer 1>"}
{"question": "<question 2>", "answer": "<answer 2>"}
...
Process the Excel file
We will extract the question and assemble the content in page_content of Document, extract the answer and assemble as extra field in metadata (jsonlAnswer) of Document.
:param jsonl: jsonl file content
:param kwargs: other arguments
:return: list of Document, e.g.
[
Document(page_content="<question 1>", metadata={"jsonlAnswer": "<answer 1>, other metadata in metadata_template"}),
Expand All @@ -39,7 +34,7 @@ def process_xlsx(s3, jsonl: bytes, **kwargs) -> List[Document]:
local_path = f"/tmp/excel-{timestamp_str}-{random_uuid}.xlsx"

s3.download_file(bucket_name, key, local_path)

try:
# load the excel file
df = pd.read_excel(local_path)
Expand Down Expand Up @@ -71,15 +66,17 @@ def process_xlsx(s3, jsonl: bytes, **kwargs) -> List[Document]:
)
)
# assemble the Document
doc = Document(page_content=page_content, metadata=metadata)
doc = Document(page_content=page_content,
metadata=metadata)
doc_list.append(doc)
except json.JSONDecodeError as e:
logger.error(
f"jsonl_line: {str(json_obj)} is not a valid json object, error: {e}"
f"line: {str(json_obj)} is not a valid json object, error: {e}"
)
continue
except KeyError as e:
logger.error(f"jsonl_line: {str(json_obj)} does not contain key: {e}")
logger.error(
f"line: {str(json_obj)} does not contain key: {e}")
else:
from .csv import CustomCSVLoader
local_temp_path = local_path.replace('.xlsx', '.csv')
Expand All @@ -89,10 +86,10 @@ def process_xlsx(s3, jsonl: bytes, **kwargs) -> List[Document]:
)
doc_list = loader.load()
except UnicodeDecodeError as e:
logger.error(f"jsonl file is not utf-8 encoded, error: {e}")
logger.error(f"Excel file is not utf-8 encoded, error: {e}")
raise e

logger.info(
f"processed jsonl_list: {doc_list} and if it is iterable: {isinstance(doc_list, Iterable)}"
f"processed excel file: {doc_list} and if it is iterable: {isinstance(doc_list, Iterable)}"
)
return doc_list
2 changes: 1 addition & 1 deletion source/lambda/job/glue-job-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ def ingestion_pipeline(
SplittingType.SEMANTIC.value,
)

gen_chunk_flag = False if file_type == "csv" else True
gen_chunk_flag = False if file_type in ["csv", "xlsx", "xls"] else True
batches = batch_chunk_processor.batch_generator(res, gen_chunk_flag)

for batch in batches:
Expand Down

0 comments on commit 89b2381

Please sign in to comment.