diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py index 5ca86c2d..c27c0fbf 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py @@ -5,13 +5,11 @@ from datetime import datetime, timezone from load_if import load_if from load_price import collect_price_with_multithreading -from utils.merge_df import merge_df +from utils.merge_df import merge_price_eviction_df from utils.compare_data import compare from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch from utils.pub_service import send_slack_message, AZURE_CONST, STORAGE_CONST -BUCKET_NAME = os.environ.get('BUCKET_NAME') -KEY = os.environ.get('S3_LATEST_DATA_SAVE_PATH') WORKLOAD_COLS = ['InstanceTier', 'InstanceType', 'Region'] FEATURE_COLS = ['OndemandPrice', 'SpotPrice', 'IF'] str_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M") @@ -40,7 +38,7 @@ def azure_collector(timestamp): is_if_fetch_success = False if is_price_fetch_success and is_if_fetch_success: - join_df = merge_df(current_df, eviction_df) + join_df = merge_price_eviction_df(current_df, eviction_df) elif not is_price_fetch_success and is_if_fetch_success: join_df = eviction_df elif is_price_fetch_success and not is_if_fetch_success: diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index 5bff9fe0..dac3df5d 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -1,8 +1,10 @@ import load_sps +import pandas as pd from datetime import datetime from sps_module import sps_shared_resources +from utils.merge_df import merge_price_eviction_sps_df from utils.upload_data import update_latest_sps, save_raw_sps -from utils.pub_service import send_slack_message, logger +from utils.pub_service import send_slack_message, logger, S3, AZURE_CONST FIRST_TIME_ACTION = "First_Time" # 첫 실행 액션 EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션 @@ -33,11 +35,17 @@ def lambda_handler(event, _): else: raise ValueError(f"Invalid lambda action.") - # SPS 데이터 처리 - if sps_res_df is None: - raise ValueError("sps_res_df is None") - if not handle_res_df(sps_res_df, event_time_utc): - raise RuntimeError("Failed to update or save SPS data") + + # price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz') + price_if_df = pd.DataFrame(S3.read_file(AZURE_CONST.LATEST_FILENAME, 'json')) + price_eviction_sps_df = merge_price_eviction_sps_df(price_if_df, sps_res_df) + + if sps_res_df is None: raise ValueError("sps_res_df is None") + if price_if_df is None: raise ValueError("price_if_df is None") + if price_eviction_sps_df is None: raise ValueError("price_eviction_sps_df is None") + + if not update_and_save_res_df(price_eviction_sps_df, event_time_utc): + raise RuntimeError("Failed to update or save price_eviction_sps_df data") return handle_response(200, "Executed Successfully!", action, event_time_utc) @@ -48,10 +56,10 @@ def lambda_handler(event, _): return handle_response(500, "Execute Failed!", action, event_time_utc, str(e)) -def handle_res_df(sps_res_df, event_time_utc): +def update_and_save_res_df(price_eviction_sps_df, event_time_utc): try: - update_result = update_latest_sps(sps_res_df, event_time_utc) - save_result = save_raw_sps(sps_res_df, event_time_utc) + update_result = update_latest_sps(price_eviction_sps_df, event_time_utc) + save_result = save_raw_sps(price_eviction_sps_df, event_time_utc) return update_result and save_result except Exception as e: diff --git a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py index ef863754..175027fe 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py @@ -64,8 +64,8 @@ def collect_spot_placement_score_first_time(desired_count): start_time = time.time() res_price_api = collect_regions_and_instance_types_df_by_priceapi() regions_and_instance_types_df, SS_Resources.region_map_and_instance_map_tmp['region_map'], SS_Resources.region_map_and_instance_map_tmp['instance_map'] = res_price_api - S3.upload_file(SS_Resources.region_map_and_instance_map_tmp, - AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME, "json") + + S3.upload_file(SS_Resources.region_map_and_instance_map_tmp, f"{AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME}", "json") end_time = time.time() elapsed = end_time - start_time @@ -101,7 +101,7 @@ def collect_spot_placement_score_first_time(desired_count): df_greedy_clustering_filtered = sps_prepare_parameters.greedy_clustering_to_create_optimized_request_list( regions_and_instance_types_filtered_df) - S3.upload_file(df_greedy_clustering_filtered, AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME, "pkl") + S3.upload_file(df_greedy_clustering_filtered, f"{AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME}", "pkl") end_time = time.time() elapsed = end_time - start_time @@ -122,7 +122,7 @@ def collect_spot_placement_score(desired_count): assert get_variable_from_s3() initialize_sps_shared_resources() - df_greedy_clustering_filtered = S3.read_file(AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME, 'pkl') + df_greedy_clustering_filtered = S3.read_file(f"{AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME}", 'pkl') sps_res_df = execute_spot_placement_score_task_by_parameter_pool_df(df_greedy_clustering_filtered, True, desired_count) print(f'Time_out_retry_count: {SS_Resources.time_out_retry_count}') @@ -162,10 +162,10 @@ def execute_spot_placement_score_task_by_parameter_pool_df(api_calls_df, availab score_data = { "DesiredCount": desired_count, "AvailabilityZone": score.get("availabilityZone", None), - "RegionCodeSPS": score.get("region", None), + # "RegionCodeSPS": score.get("region", None), "Region": SS_Resources.region_map_and_instance_map_tmp['region_map'].get( score.get("region", ""), ""), - "InstanceTypeSPS": score.get("sku", None), + # "InstanceTypeSPS": score.get("sku", None), "InstanceTier": SS_Resources.region_map_and_instance_map_tmp['instance_map'].get( score.get("sku", ""), {}).get("InstanceTier", None), "InstanceType": SS_Resources.region_map_and_instance_map_tmp['instance_map'].get( @@ -323,12 +323,12 @@ def extract_invalid_values(error_message): def initialize_files_in_s3(): try: files_to_initialize = { - AZURE_CONST.INVALID_REGIONS_JSON_FILENAME: [], - AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME: [] + f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}": [], + f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}": [] } - for file_name, data in files_to_initialize.items(): - S3.upload_file(data, file_name, "json") + for file_path, data in files_to_initialize.items(): + S3.upload_file(data, file_path, "json") print("Successfully initialized files in S3.") return True @@ -401,28 +401,28 @@ def initialize_sps_shared_resources(): def save_tmp_files_to_s3(): files_to_upload = { - AZURE_CONST.INVALID_REGIONS_JSON_FILENAME: SS_Resources.invalid_regions_tmp, - AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME: SS_Resources.invalid_instance_types_tmp, - AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME: SS_Resources.locations_call_history_tmp, - AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME: SS_Resources.locations_over_limit_tmp, - AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME: { + f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}": SS_Resources.invalid_regions_tmp, + f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}": SS_Resources.invalid_instance_types_tmp, + f"{AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME}": SS_Resources.locations_call_history_tmp, + f"{AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME}": SS_Resources.locations_over_limit_tmp, + f"{AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME}": { "last_subscription_id": SS_Resources.last_subscription_id_and_location_tmp['last_subscription_id'], "last_location": SS_Resources.last_subscription_id_and_location_tmp['last_location'] }, } - for file_name, file_data in files_to_upload.items(): + for file_path, file_data in files_to_upload.items(): if file_data: - S3.upload_file(file_data, file_name, "json") + S3.upload_file(file_data, file_path, "json") def get_variable_from_s3(): try: - invalid_regions_data = S3.read_file(AZURE_CONST.INVALID_REGIONS_JSON_FILENAME, 'json') - instance_types_data = S3.read_file(AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME, 'json') - call_history_data = S3.read_file(AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME, 'json') - over_limit_data = S3.read_file(AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME, 'json') - last_location_index_data = S3.read_file(AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME, 'json') - region_map_and_instance_map = S3.read_file(AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME, 'json') + invalid_regions_data = S3.read_file(f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}", 'json') + instance_types_data = S3.read_file(f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}", 'json') + call_history_data = S3.read_file(f"{AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME}", 'json') + over_limit_data = S3.read_file(f"{AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME}", 'json') + last_location_index_data = S3.read_file(f"{AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME}", 'json') + region_map_and_instance_map = S3.read_file(f"{AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME}", 'json') SS_Resources.invalid_regions_tmp = invalid_regions_data SS_Resources.invalid_instance_types_tmp = instance_types_data diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py index ebca1fca..987bee48 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py @@ -1,7 +1,7 @@ import pandas as pd import numpy as np -def merge_df(price_df, eviction_df): +def merge_price_eviction_df(price_df, eviction_df): join_df = pd.merge(price_df, eviction_df, left_on=['InstanceType', 'InstanceTier', 'armRegionName'], right_on=['InstanceType', 'InstanceTier', 'Region'], @@ -12,3 +12,13 @@ def merge_df(price_df, eviction_df): join_df.rename(columns={'Region_x' : 'Region', 'OndemandPrice_x' : 'OndemandPrice', 'SpotPrice_x' : 'SpotPrice', 'Savings_x' : 'Savings'}, inplace=True) return join_df + + +def merge_price_eviction_sps_df(price_eviction_df, sps_df): + join_df = pd.merge(price_eviction_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer') + join_df.rename(columns={'time_x': 'PriceEviction_Update_Time', 'time_y': 'SPS_Update_Time'}, inplace=True) + join_df.drop(columns=['id_x', 'id_y'], inplace=True) + join_df = join_df[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", + "PriceEviction_Update_Time", "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time"]] + + return join_df \ No newline at end of file diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py index 14015fa2..e8a8ad37 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py @@ -6,6 +6,7 @@ import inspect import os import logging +import pandas as pd from const_config import AzureCollector, Storage AZURE_CONST = AzureCollector() @@ -44,7 +45,7 @@ def __init__(self): self.client = s3_client self.resource = s3_resource - def upload_file(self, data, file_name, file_type="json", set_public_read = False): + def upload_file(self, data, file_path, file_type="json", set_public_read = False): try: if file_type not in ["json", "pkl", "df_to_csv.gz"]: raise ValueError("Unsupported file type. Use 'json' or 'pkl'.") @@ -68,23 +69,21 @@ def upload_file(self, data, file_name, file_type="json", set_public_read = False data.to_csv(file, index=False, compression="gzip") file.seek(0) - file_path = f"{AZURE_CONST.SPS_FILE_PATH}/{file_name}" self.client.upload_fileobj(file, STORAGE_CONST.BUCKET_NAME, file_path) if set_public_read: object_acl = self.resource.ObjectAcl(STORAGE_CONST.BUCKET_NAME, file_path) object_acl.put(ACL='public-read') - print(f"[S3]: Succeed to upload. Filename: [{file_name}]") + print(f"[S3]: Succeed to upload. Filename: [{file_path}]") except ValueError as ve: - print(f"Validation error for {file_name}: {ve}") + print(f"Validation error for {file_path}: {ve}") except Exception as e: - print(f"Upload failed for {file_name}: {e}") + print(f"Upload failed for {file_path}: {e}") - def read_file(self, file_name, file_type="json"): + def read_file(self, file_path, file_type="json"): try: - file_path = f"{AZURE_CONST.SPS_FILE_PATH}/{file_name}" response = self.client.get_object(Bucket=STORAGE_CONST.BUCKET_NAME, Key=file_path) file = io.BytesIO(response['Body'].read()) @@ -92,16 +91,19 @@ def read_file(self, file_name, file_type="json"): return json.load(file) elif file_type == "pkl": - return pickle.load(file) + return pd.read_pickle(file) + + elif file_type == "pkl.gz": + return pd.read_pickle(file, compression="gzip") else: - raise ValueError("Unsupported file type. Use 'json' or 'pkl'.") + raise ValueError("Unsupported file type. Use 'json' or 'pkl' or 'pkl.gz'.") except json.JSONDecodeError: - print(f"Warning: {file_name} is not a valid JSON file.") + print(f"Warning: {file_path} is not a valid JSON file.") return None except Exception as e: - print(f"Error reading {file_name} from S3: {e}") + print(f"Error reading {file_path} from S3: {e}") return None class LoggerConfig(logging.Logger): diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 8e2a608e..fb0273f8 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -87,18 +87,25 @@ def update_latest(data, timestamp): data['time'] = datetime.strftime(timestamp, '%Y-%m-%d %H:%M:%S') - result = data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME }", orient='records') + data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", orient='records') + data.to_pickle(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", compression="gzip") session = boto3.Session() s3 = session.client('s3') - with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME }", 'rb') as f: + with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", 'rb') as f: s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH) + with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", 'rb') as f: + s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH) + s3 = boto3.resource('s3') object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH) response = object_acl.put(ACL='public-read') + object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH) + response = object_acl.put(ACL='public-read') + pickle.dump(data, open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.SERVER_SAVE_FILENAME}", "wb")) @@ -164,7 +171,8 @@ def update_latest_sps(dataframe, time_utc): dataframe['AvailabilityZone'] = dataframe['AvailabilityZone'].where(pd.notna(dataframe['AvailabilityZone']), None) json_data = dataframe.to_dict(orient="records") - S3.upload_file(json_data, f"result/{AZURE_CONST.LATEST_SPS_FILENAME}", "json", set_public_read=True) + + S3.upload_file(json_data, f"{AZURE_CONST.LATEST_SPS_FILENAME}", "json", set_public_read=True) return True except Exception as e: @@ -179,8 +187,7 @@ def save_raw_sps(dataframe, time_utc): s3_dir_name = time_utc.strftime("%Y/%m/%d") s3_obj_name = time_utc.strftime("%H-%M-%S") - - S3.upload_file(dataframe, f"result/rawdata/{s3_dir_name}/{s3_obj_name}.csv.gz", "df_to_csv.gz", set_public_read=True) + S3.upload_file(dataframe, f"sps-collector/azure/result/rawdata/{s3_dir_name}/{s3_obj_name}.csv.gz", "df_to_csv.gz", set_public_read=True) return True except Exception as e: diff --git a/const_config.py b/const_config.py index 25724f9a..de0c0449 100644 --- a/const_config.py +++ b/const_config.py @@ -79,8 +79,12 @@ def LATEST_FILENAME(): return "latest_azure.json" @constant - def LATEST_SPS_FILENAME(): - return "latest_azure_sps.json" + def LATEST_PRICE_IF_PKL_GZIP_FILENAME(): + return "latest_price_if_azure.pkl.gz" + + @constant + def S3_LATEST_PRICE_IF_GZIP_SAVE_PATH(): + return "latest_data/latest_price_if_azure.pkl.gz" @constant def S3_LATEST_DATA_SAVE_PATH(): @@ -130,37 +134,37 @@ def SPOT_DATA_COLLECTION_LOG_GROUP_NAME(): def LOG_STREAM_NAME(): return "Azure-Count" - @constant - def SPS_FILE_PATH(): - return "sps-collector/azure" - @constant def LOCATIONS_CALL_HISTORY_JSON_FILENAME(): - return "locations_call_history.json" + return "sps-collector/azure/locations_call_history.json" @constant def LOCATIONS_OVER_LIMIT_JSON_FILENAME(): - return "locations_over_limit.json" + return "sps-collector/azure/locations_over_limit.json" @constant def INVALID_REGIONS_JSON_FILENAME(): - return "invalid_regions.json" + return "sps-collector/azure/invalid_regions.json" @constant def INVALID_INSTANCE_TYPES_JSON_FILENAME(): - return "invalid_instance_types.json" + return "sps-collector/azure/invalid_instance_types.json" @constant def LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME(): - return "last_subscription_id_and_location.json" + return "sps-collector/azure/last_subscription_id_and_location.json" @constant def REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME(): - return "region_map_and_instance_map.json" + return "sps-collector/azure/region_map_and_instance_map.json" @constant def DF_TO_USE_TODAY_PKL_FILENAME(): - return "df_to_use_today.pkl" + return "sps-collector/azure/df_to_use_today.pkl" + + @constant + def LATEST_SPS_FILENAME(): + return "sps-collector/azure/result/latest_azure_sps.json" class GcpCollector(object): @constant