Skip to content

Commit

Permalink
Change caching of entire dataframe rather than individual uuids for i…
Browse files Browse the repository at this point in the history
…ncreased performance
  • Loading branch information
bo-lu committed Jun 12, 2024
1 parent 1cf7589 commit e771f4a
Showing 1 changed file with 58 additions and 89 deletions.
147 changes: 58 additions & 89 deletions collections/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
import awswrangler as wr

from uuid import UUID
from datetime import datetime
from botocore.exceptions import ClientError

PARQUET_BUCKET_NAME = os.environ['PARQUET_BUCKET_NAME']
EXPIRY_DAYS = 2
MAX_CHILD_OR_SIBLING_LENGTH = int(os.environ['MAX_CHILD_OR_SIBLING_LENGTH'])

cache = {}
cache_date = {}
cache_df = pd.DataFrame()

def lambda_handler(event, context):

Expand All @@ -34,44 +32,25 @@ def lambda_handler(event, context):
except:
lang = "en"

if uuid == False:
message += "No id parameter was passed. Usage: ?id=XYZ&lang=en_or_fr"
return {
'statusCode': 200,
'body': message
}

date_time = datetime.utcnow().now()
cached_datetime_obj = None

compound_key = uuid + "_" + lang
cached_datetime_str = get_from_datetime_cache(compound_key)

#Check to see if there is a cache hit on the datetime cache
if cached_datetime_str != None:
format_string = "%Y-%m-%d %H:%M:%S.%f"
cached_datetime_obj = datetime.strptime(cached_datetime_str, format_string) #convert datetime str to obj

diff = date_time - cached_datetime_obj
days = diff.days #calculate the difference in days

#Compare current curr day with cached date_time
if days < EXPIRY_DAYS:
#Return the cached result and exit program
if get_from_cache(compound_key) != None:
return get_from_cache(compound_key)

#Cache miss or a need to invalidate the cache
if uuid != False:
try:
print("Finding the parent and other children for uuid: ", uuid)
geocore_df = wr.s3.read_parquet(path=PARQUET_BUCKET_NAME)
except ClientError as e:
message += "Error accessing " + PARQUET_BUCKET_NAME
return {
'statusCode': 200,
'body': json.dumps(message)
}

geocore_df = get_df_cache()
if geocore_df.empty:
try:
#print("Finding the parent and other children for uuid: ", uuid)
geocore_df = wr.s3.read_parquet(path=PARQUET_BUCKET_NAME)
#print("From S3")
add_df_to_cache(geocore_df)
except ClientError as e:
message += "Error accessing " + PARQUET_BUCKET_NAME
return {
'statusCode': 200,
'body': json.dumps(message)
}
else:
print("From DF Cache")

#self
self_json = find_self(geocore_df, uuid)

Expand All @@ -82,44 +61,27 @@ def lambda_handler(event, context):
child_json = None
child_count = 0
child_json, child_count = find_children(geocore_df, uuid)

if child_json != None:
print("child_json ", lang)
if lang == 'en':
child_json = sorted(child_json, key=lambda x: x['description_en'], reverse=True)
elif lang == 'fr':
child_json = sorted(child_json, key=lambda x: x['description_fr'], reverse=True)
else:
child_json = sorted(child_json, key=lambda x: x['description_fr'], reverse=True)


#sibling
sibling_json = None
sibling_count = 0
if parent_json != None and child_json == None:
sibling_json, sibling_count = find_siblings(geocore_df, parent_id, uuid)
print("sibling_json ", lang)
if lang == 'en':
sibling_json = sorted(sibling_json, key=lambda x: x['description_en'], reverse=True)
elif lang == 'fr':
sibling_json = sorted(sibling_json, key=lambda x: x['description_fr'], reverse=True)
else:
sibling_json = sorted(sibling_json, key=lambda x: x['description_fr'], reverse=True)

#Dictionary for the cache
json_cache = {
'statusCode': 200,
'message': nonesafe_loads('{ "message_en": "cached result", "message_fr": "résultat mis en cache" }'),
'sibling_count': sibling_count,
'child_count': child_count,
'self': self_json,
'parent': parent_json,
'sibling': sibling_json,
'child': child_json
}

add_to_cache(compound_key, json_cache)
add_to_datetime_cache(compound_key, str(date_time))

else:
message += "No id parameter was passed. Usage: ?id=XYZ"
return {
Expand Down Expand Up @@ -179,8 +141,9 @@ def find_parent(geocore_df, uuid):
else:
try:
parent_id = parent_df.iloc[0]['features_properties_parentIdentifier']
parent_desc_en = geocore_df[geocore_df['features_properties_id'] == parent_id].iloc[0]['features_properties_title_en'].replace('"', '\\"')
parent_desc_fr = geocore_df[geocore_df['features_properties_id'] == parent_id].iloc[0]['features_properties_title_fr'].replace('"', '\\"')
parent_row = geocore_df.loc[geocore_df['features_properties_id'] == parent_id]
parent_desc_en = parent_row['features_properties_title_en'].values[0].replace('"', '\\"')
parent_desc_fr = parent_row['features_properties_title_fr'].values[0].replace('"', '\\"')
parent_message = '{ "id": "' + parent_id + '", "description_en": "' + parent_desc_en + '", "description_fr": "' + parent_desc_fr + '"}'
except:
parent_message = None
Expand Down Expand Up @@ -208,14 +171,20 @@ def find_siblings(geocore_df, parent_id, uuid):
child_message = None
else:
child_message = "["
for i in range(0,len(other_children_df)):
child_array_id.append(other_children_df.iloc[i]['features_properties_id'])
child_array_desc_en.append(other_children_df.iloc[i]['features_properties_title_en'])
child_array_desc_fr.append(other_children_df.iloc[i]['features_properties_title_fr'])

for i in range(0,len(other_children_df)):
child_message += '{ "id": "' + child_array_id[i] + '", "description_en": "' + child_array_desc_en[i].replace('"', '\\"') + '", "description_fr": "' + child_array_desc_fr[i].replace('"', '\\"') + '"}'
if i != len(other_children_df)-1:
#vectorized optimized code
child_array_id.extend(other_children_df['features_properties_id'].tolist())
child_array_desc_en.extend(other_children_df['features_properties_title_en'].tolist())
child_array_desc_fr.extend(other_children_df['features_properties_title_fr'].tolist())

if (len(other_children_df) > MAX_CHILD_OR_SIBLING_LENGTH):
child_length = MAX_CHILD_OR_SIBLING_LENGTH
else:
child_length = len(other_children_df)

for i in range(0,child_length):
child_message += '{ "id": "' + child_array_id[i] + '", "description_en": "' + child_array_desc_en[i].replace('"', '\\"') + '", "description_fr": "' + child_array_desc_fr[i].replace('"', '\\"') + '"}'
if i != child_length-1:
child_message += ', '
child_message += "]"

Expand All @@ -241,14 +210,20 @@ def find_children(geocore_df, uuid):
child_message = None
else:
child_message = "["
for i in range(0,len(other_children_df)):
child_array_id.append(other_children_df.iloc[i]['features_properties_id'])
child_array_desc_en.append(other_children_df.iloc[i]['features_properties_title_en'])
child_array_desc_fr.append(other_children_df.iloc[i]['features_properties_title_fr'])

for i in range(0,len(other_children_df)):
child_message += '{ "id": "' + child_array_id[i].replace('"', '\\"') + '", "description_en": "' + child_array_desc_en[i].replace('"', '\\"') + '", "description_fr": "' + child_array_desc_fr[i].replace('"', '\\"') + '"}'
if i != len(other_children_df)-1:

#vectorized optimized code
child_array_id.extend(other_children_df['features_properties_id'].tolist())
child_array_desc_en.extend(other_children_df['features_properties_title_en'].tolist())
child_array_desc_fr.extend(other_children_df['features_properties_title_fr'].tolist())

if (len(other_children_df) > MAX_CHILD_OR_SIBLING_LENGTH):
child_length = MAX_CHILD_OR_SIBLING_LENGTH
else:
child_length = len(other_children_df)

for i in range(0,child_length):
child_message += '{ "id": "' + child_array_id[i].replace('"', '\\"') + '", "description_en": "' + child_array_desc_en[i].replace('"', '\\"') + '", "description_fr": "' + child_array_desc_fr[i].replace('"', '\\"') + '"}'
if i != child_length-1:
child_message += ', '
child_message += "]"

Expand All @@ -258,18 +233,12 @@ def nonesafe_loads(obj):
if obj is not None:
return json.loads(obj)

# Function to add JSON payload to the cache
def add_to_cache(key, json_payload):
cache[key] = json_payload
# Add dataframe to cache
def add_df_to_cache(dataframe):
global cache_df
cache_df = dataframe.copy()

# Function to retrieve JSON payload from the cache
def get_from_cache(key):
return cache.get(key)

# Function to add datetime payload to the cache_date for invalidation
def add_to_datetime_cache(key, datetime):
cache_date[key] = datetime

# Function to retrieve datetime payload from the cache_date for invalidation
def get_from_datetime_cache(key):
return cache_date.get(key)
# Get dataframe to cache
def get_df_cache():
global cache_df
return cache_df

0 comments on commit e771f4a

Please sign in to comment.