From c88e1f08de7fcc16be1ffa1e2a934d3262ced50f Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Fri, 21 Feb 2025 14:41:00 +0900 Subject: [PATCH 01/14] =?UTF-8?q?=EC=82=AC=EC=86=8C=20=EC=BD=94=EB=93=9C?= =?UTF-8?q?=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spot-dataset/azure/lambda/current_collector/load_sps.py | 2 +- .../current_collector/sps_module/sps_location_manager.py | 2 +- .../azure/lambda/current_collector/utils/azure_auth.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py index e5ecd385..86aa812d 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py @@ -256,7 +256,7 @@ def execute_spot_placement_score_api(region_chunk, instance_type_chunk, availabi "Content-Type": "application/json", } try: - response = requests.post(url, headers=headers, json=request_body, timeout=35) + response = requests.post(url, headers=headers, json=request_body, timeout=40) response.raise_for_status() return response.json() diff --git a/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py b/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py index b9903f4e..2e21dda2 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py +++ b/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py @@ -51,7 +51,7 @@ def validation_can_call(location, history, over_limit_locations): 이 메서드는 지정된 location으로 호출 가능한지 확인합니다. 초과 요청 여부와 호출 이력의 크기를 기준으로 판단합니다. """ - if over_limit_locations is not None: + if over_limit_locations: if ((location not in over_limit_locations) and (len(history[location]) < 10)): return True diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/azure_auth.py b/collector/spot-dataset/azure/lambda/current_collector/utils/azure_auth.py index 96ef91c8..c38d67ef 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/azure_auth.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/azure_auth.py @@ -2,11 +2,11 @@ import requests import time from azure.identity import ClientSecretCredential -from utils.pub_service import db_AzureAuth +from utils.pub_service import DB_AzureAuth from azure.core.exceptions import ClientAuthenticationError def get_token(): - db = db_AzureAuth + db = DB_AzureAuth now = int(time.time()) From d0d09e54910037ca70160040315a0b69d6d8fdd3 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Fri, 21 Feb 2025 15:29:22 +0900 Subject: [PATCH 02/14] =?UTF-8?q?Azure=20SPS=20=EC=A0=95=EB=B3=B4=20?= =?UTF-8?q?=EC=A0=9C=EA=B3=B5=EC=9D=84=20=EC=9C=84=ED=95=9C=20=EC=8B=A4?= =?UTF-8?q?=EC=A0=9C=20=EC=9A=B4=EC=98=81=20=ED=99=98=EA=B2=BD=20=ED=86=B5?= =?UTF-8?q?=ED=95=A9:=20query=5Fselector=20/=20upload=5Ftimestream=20/=20s?= =?UTF-8?q?ubmit=5Fbatch=20=EB=93=B1=20=EC=8B=A0=EA=B7=9C=20=EB=A9=94?= =?UTF-8?q?=EC=84=9C=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function.py | 21 +- .../current_collector/lambda_function_sps.py | 97 ++++++--- .../current_collector/utils/compare_data.py | 52 ++++- .../current_collector/utils/pub_service.py | 35 +++- .../current_collector/utils/upload_data.py | 185 +++++++++++++++--- const_config.py | 16 +- 6 files changed, 330 insertions(+), 76 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py index 2b1a3a33..8037469e 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py @@ -1,18 +1,11 @@ -import os -import json -import boto3 import pandas as pd -from datetime import datetime, timezone +from datetime import datetime from load_if import load_if from load_price import collect_price_with_multithreading from utils.merge_df import merge_price_eviction_df from utils.compare_data import compare from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch -from utils.pub_service import send_slack_message, AZURE_CONST, STORAGE_CONST - -WORKLOAD_COLS = ['InstanceTier', 'InstanceType', 'Region'] -FEATURE_COLS = ['OndemandPrice', 'SpotPrice', 'IF'] - +from utils.pub_service import send_slack_message, AZURE_CONST, S3 def lambda_handler(event, _): event_time_utc = event.get("time") @@ -55,11 +48,7 @@ def lambda_handler(event, _): try: # load previous dataframe - s3 = boto3.resource('s3') - object = s3.Object(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH) - response = object.get() - data = json.load(response['Body']) - previous_df = pd.DataFrame(data) + previous_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz') # upload latest azure price to s3 update_latest(join_df, event_time_utc_datetime) @@ -69,7 +58,9 @@ def lambda_handler(event, _): upload_cloudwatch(join_df, event_time_utc_datetime) # compare and upload changed_df to timestream - changed_df = compare(previous_df, join_df, AZURE_CONST.DF_WORKLOAD_COLS, AZURE_CONST.DF_FEATURE_COLS) + workload_cols = ['InstanceTier', 'InstanceType', 'Region'] + feature_cols = ['OndemandPrice', 'SpotPrice', 'IF'] + changed_df = compare(previous_df, join_df, workload_cols, feature_cols) if not changed_df.empty: query_selector(changed_df) upload_timestream(changed_df, event_time_utc_datetime) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index 301abacc..eeec2f88 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -1,18 +1,20 @@ import load_sps import pandas as pd +import traceback from datetime import datetime from sps_module import sps_shared_resources from utils.merge_df import merge_price_eviction_sps_df -from utils.upload_data import update_latest_sps, save_raw_sps -from utils.pub_service import send_slack_message, logger, S3, AZURE_CONST +from utils.upload_data import update_latest_sps, save_raw_sps, upload_timestream_sps, query_selector_sps, upload_cloudwatch_sps +from utils.compare_data import compare_sps +from utils.pub_service import send_slack_message, Logger, S3, AZURE_CONST FIRST_TIME_ACTION = "First_Time" # 첫 실행 액션 EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션 UTC_1500_TIME = "15:00" # UTC 15:00 (KST 00:00) -def lambda_handler(event, _): +def lambda_handler(event, context): action = event.get("action") - event_id = event.get("id") + log_stream_id = context.log_stream_name event_time_utc = event.get("time") event_time_utc_datetime = datetime.strptime(event_time_utc, "%Y-%m-%dT%H:%M:%SZ") @@ -22,7 +24,7 @@ def lambda_handler(event, _): desired_count = sps_shared_resources.time_desired_count_map.get(event_time_utc_datetime.strftime("%H:%M"), 1) - logger.info(f"Lambda triggered: action: {action}, event_time: {datetime.strftime(event_time_utc_datetime, '%Y-%m-%d %H:%M:%S')}, desired_count: {desired_count}") + Logger.info(f"Lambda triggered: action: {action}, event_time: {datetime.strftime(event_time_utc_datetime, '%Y-%m-%d %H:%M:%S')}, desired_count: {desired_count}") if action == FIRST_TIME_ACTION: sps_res_availability_zones_true_df, sps_res_availability_zones_false_df = load_sps.collect_spot_placement_score_first_time(desired_count=desired_count) @@ -30,7 +32,7 @@ def lambda_handler(event, _): elif action == EVERY_10MIN_ACTION: # UTC 15:00 (KST 00:00)인 경우 실행 건너뛰기 if event_time_utc_datetime.strftime("%H:%M") == UTC_1500_TIME: - logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)") + Logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)") return handle_response(200, "Executed successfully. Scheduled time skipped.", action, event_time_utc_datetime) sps_res_availability_zones_true_df, sps_res_availability_zones_false_df = load_sps.collect_spot_placement_score(desired_count=desired_count) @@ -49,37 +51,87 @@ def lambda_handler(event, _): except Exception as e: error_msg = f"Unexpected error: {e}" - logger.error(error_msg) - send_slack_message(f"AZURE SPS MODULE EXCEPTION!\n{error_msg}\nEvent_id: {event_id}") + Logger.error(error_msg) + send_slack_message(f"LOCAL_TEST_AZURE SPS MODULE EXCEPTION!\n{error_msg}\Log_stream_id: {log_stream_id}") return handle_response(500, "Execute Failed!", action, event_time_utc_datetime, str(e)) def handle_res_df(sps_res_true_df, sps_res_false_df, time_datetime): try: - sps_res_true_df['time'] = time_datetime.strftime("%Y-%m-%d %H:%M:%S") - sps_res_false_df['time'] = time_datetime.strftime("%Y-%m-%d %H:%M:%S") - sps_res_true_df['AvailabilityZone'] = sps_res_true_df['AvailabilityZone'].where(pd.notna(sps_res_true_df['AvailabilityZone']), None) + time_str = time_datetime.strftime("%Y-%m-%d %H:%M:%S") + sps_res_true_df['time'] = time_str + sps_res_false_df['time'] = time_str + sps_res_true_df['AvailabilityZone'] = sps_res_true_df['AvailabilityZone'].where(pd.notna(sps_res_true_df['AvailabilityZone']), None) price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz') - if price_if_df is None: raise ValueError("price_if_df is None") + if price_if_df is None: + raise ValueError("price_if_df is None") - price_eviction_sps_zone_true_df = merge_price_eviction_sps_df(price_if_df, sps_res_true_df, True) - success_availability_zones_true = (update_latest_sps(price_eviction_sps_zone_true_df, True) - and save_raw_sps(price_eviction_sps_zone_true_df, time_datetime, True)) + success_availability_zone_true = process_zone_data(price_if_df, sps_res_true_df, time_datetime, True) + success_availability_zone_false = process_zone_data(price_if_df, sps_res_false_df, time_datetime, False) - price_eviction_sps_zone_false_df = merge_price_eviction_sps_df(price_if_df, sps_res_false_df, False) - success_availability_zones_false = (update_latest_sps(price_eviction_sps_zone_false_df, False) - and save_raw_sps(price_eviction_sps_zone_false_df, time_datetime, False)) - - if success_availability_zones_true and success_availability_zones_false: - logger.info(f"Successfully merged the price/if/sps df, updated latest results, and saved raw data!") + if success_availability_zone_true and success_availability_zone_false: + Logger.info("Successfully merged the price/if/sps df, process_zone_data!") return True + else: + Logger.info("Failed to merge the price/if/sps df, process_zone_data!") + return False except Exception as e: - logger.error(f"Error in handle_res_df function: {e}") + Logger.error(f"Error in handle_res_df function: {e}") return False + +def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): + try: + price_eviction_sps_zone_df = merge_price_eviction_sps_df(price_if_df, sps_res_df, is_true_zone) + + if is_true_zone: + price_eviction_sps_zone_previous_df = S3.read_file(f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}", 'pkl.gz') + workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount'] + feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time'] + + changed_df = None + if price_eviction_sps_zone_previous_df is not None and not price_eviction_sps_zone_previous_df.empty: + changed_df = compare_sps(price_eviction_sps_zone_previous_df, price_eviction_sps_zone_df, workload_cols, feature_cols) + + update_success = update_latest_sps(price_eviction_sps_zone_df, is_true_zone) + save_success = save_raw_sps(price_eviction_sps_zone_df, time_datetime, is_true_zone) + cloudwatch_success = upload_cloudwatch_sps(price_eviction_sps_zone_df, time_datetime) + + if changed_df is not None and not changed_df.empty: + query_success = query_selector_sps(changed_df) + timestream_success = upload_timestream_sps(changed_df, time_datetime) + else: + query_success = True + timestream_success = True + + success = all([update_success, save_success, cloudwatch_success, query_success, timestream_success]) + + log_details = ( + f"update: {update_success}, save: {save_success}, cloudwatch: {cloudwatch_success}, " + f"query: {query_success}, timestream: {timestream_success}" + ) + else: + update_success = update_latest_sps(price_eviction_sps_zone_df, is_true_zone) + save_success = save_raw_sps(price_eviction_sps_zone_df, time_datetime, is_true_zone) + + success = update_success and save_success + log_details = f"update: {update_success}, save: {save_success}" + + if not success: + Logger.error(f"Failed: Availability Zone {is_true_zone} Processing.") + Logger.error(log_details) + else: + return True + + except Exception as e: + Logger.error(f"Error in process_zone_data function: {e}") + Logger.error(traceback.format_exc()) + return False + + def handle_response(status_code, body, action, time_datetime, error_message=None): response = { "statusCode": status_code, @@ -90,5 +142,4 @@ def handle_response(status_code, body, action, time_datetime, error_message=None if error_message: response["error_message"] = error_message - logger.info(f"Response: {response}") return response \ No newline at end of file diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py index 2f7a69b4..740c151b 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py @@ -81,4 +81,54 @@ def compare(previous_df, current_df, workload_cols, feature_cols): current_df = current_df.loc[changed_indices].drop(['Workload', 'Feature'], axis=1) - return current_df \ No newline at end of file + return current_df + + +def compare_sps(previous_df, current_df, workload_cols, feature_cols): + previous_df = previous_df.copy() + current_df = current_df.copy() + + fill_values = { + 'OndemandPrice': -1, + 'Savings': -1, + 'IF': -1, + 'Score': -1, + 'AvailabilityZone': -1, + 'DesiredCount': -1, + 'SPS_Update_Time': -1 + } + previous_df = previous_df.fillna(fill_values) + current_df = current_df.fillna(fill_values) + + previous_df = previous_df.dropna(axis=0) + current_df = current_df.dropna(axis=0) + + previous_df['Workload'] = previous_df[workload_cols].astype(str).agg(':'.join, axis=1) + previous_df['Feature'] = previous_df[feature_cols].astype(str).agg(':'.join, axis=1) + current_df['Workload'] = current_df[workload_cols].astype(str).agg(':'.join, axis=1) + current_df['Feature'] = current_df[feature_cols].astype(str).agg(':'.join, axis=1) + + previous_df = previous_df.drop_duplicates(subset=['Workload']) + current_df = current_df.drop_duplicates(subset=['Workload']) + + # current_df와 previous_df 를 merge 방법으로 비교 + merged_df = current_df.merge( + previous_df[['Workload', 'Feature']], + on='Workload', + how='left', # previous_df 기준으로 병합 + suffixes=('_curr', '_prev') + ) + + # 변경된 행 필터링 + changed_df = merged_df[ + # Workload가 새로 추가된 경우 (previous_df에 존재하지 않음) + (merged_df['Feature_prev'].isna()) | + # Feature 값이 변경된 경우 (Workload는 존재하지만 Feature 값이 다름) + ((merged_df['Feature_prev'].notna()) & (merged_df['Feature_curr'] != merged_df['Feature_prev'])) + ] + + current_df = current_df.drop(columns=["Feature", "Workload"], errors="ignore") + + changed_df = changed_df[current_df.columns] + + return changed_df if not changed_df.empty else None \ No newline at end of file diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py index e8a8ad37..c6dbd45c 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py @@ -7,6 +7,7 @@ import os import logging import pandas as pd +from botocore.config import Config from const_config import AzureCollector, Storage AZURE_CONST = AzureCollector() @@ -17,6 +18,10 @@ s3_client = session.client('s3', region_name='us-west-2') s3_resource = session.resource('s3', region_name='us-west-2') ssm_client = session.client('ssm', region_name='us-west-2') +cw_client = session.client('logs', region_name='us-west-2') +timestream_write_client = session.client('timestream-write', + region_name='us-west-2', + config=Config(read_timeout=20,max_pool_connections=5000,retries={'max_attempts': 10})) class DynamoDB: def __init__(self, table): @@ -47,12 +52,12 @@ def __init__(self): def upload_file(self, data, file_path, file_type="json", set_public_read = False): try: - if file_type not in ["json", "pkl", "df_to_csv.gz"]: - raise ValueError("Unsupported file type. Use 'json' or 'pkl'.") + if file_type not in ['json', 'pkl', 'pkl.gz', 'df_to_csv.gz']: + raise ValueError("Unsupported file type. To use 'json', 'pkl', 'pkl.gz', 'df_to_csv.gz'.") if file_type == "json": if not isinstance(data, (dict, list)): - raise ValueError("JSON file must be a dictionary or a list") + raise ValueError("JSON must be a dictionary or a list") file = io.BytesIO(json.dumps(data, indent=4).encode("utf-8")) elif file_type == "pkl": @@ -62,6 +67,13 @@ def upload_file(self, data, file_path, file_type="json", set_public_read = False pickle.dump(data, file) file.seek(0) + elif file_type == "pkl.gz": + if data is None: + raise ValueError("Data cannot be None for pkl.gz file") + file = io.BytesIO() + data.to_pickle(file, compression="gzip") + file.seek(0) + elif file_type == "df_to_csv.gz": if data is None: raise ValueError("Data cannot be None for csv.gz file") @@ -79,6 +91,7 @@ def upload_file(self, data, file_path, file_type="json", set_public_read = False except ValueError as ve: print(f"Validation error for {file_path}: {ve}") + except Exception as e: print(f"Upload failed for {file_path}: {e}") @@ -102,6 +115,7 @@ def read_file(self, file_path, file_type="json"): except json.JSONDecodeError: print(f"Warning: {file_path} is not a valid JSON file.") return None + except Exception as e: print(f"Error reading {file_path} from S3: {e}") return None @@ -118,11 +132,22 @@ def __init__(self, level=logging.INFO, format_str='[%(levelname)s]: %(message)s' handler.setFormatter(formatter) self.addHandler(handler) +class CWHandler: + def __init__(self): + self.client = cw_client + +class TimestreamHandler: + def __init__(self): + self.client = timestream_write_client + -db_AzureAuth = DynamoDB("AzureAuth") +Logger = LoggerConfig() +DB_AzureAuth = DynamoDB("AzureAuth") SSM = SsmHandler() S3 = S3Handler() -logger = LoggerConfig() +CW = CWHandler() +TimestreamWrite = TimestreamHandler() + def send_slack_message(msg): url_key = 'error_notification_slack_webhook_url' diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 71534786..e85f75fd 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -6,25 +6,18 @@ import pickle import pandas as pd from datetime import datetime -from botocore.config import Config -from utils.pub_service import send_slack_message, S3, AZURE_CONST, STORAGE_CONST +from utils.pub_service import AZURE_CONST, STORAGE_CONST, CW, S3, TimestreamWrite session = boto3.session.Session(region_name='us-west-2') -write_client = session.client('timestream-write', - config=Config(read_timeout=20, - max_pool_connections=5000, - retries={'max_attempts': 10}) - ) # Submit Batch To Timestream def submit_batch(records, counter, recursive): if recursive == 10: return try: - result = write_client.write_records(DatabaseName=STORAGE_CONST.BUCKET_NAME, TableName=STORAGE_CONST.AZURE_TABLE_NAME, Records=records,CommonAttributes={}) + result = TimestreamWrite.client.write_records(DatabaseName=STORAGE_CONST.BUCKET_NAME, TableName=STORAGE_CONST.AZURE_TABLE_NAME, Records=records,CommonAttributes={}) - except write_client.exceptions.RejectedRecordsException as err: - send_slack_message(err) + except TimestreamWrite.client.exceptions.RejectedRecordsException as err: print(err) re_records = [] for rr in err.response["RejectedRecords"]: @@ -32,7 +25,6 @@ def submit_batch(records, counter, recursive): submit_batch(re_records, counter, recursive + 1) exit() except Exception as err: - send_slack_message(err) print(err) exit() @@ -145,35 +137,127 @@ def query_selector(data): object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH) response = object_acl.put(ACL='public-read') - def upload_cloudwatch(data, time_datetime): ondemand_count = len(data.drop(columns=['IF', 'SpotPrice', 'Savings']).dropna()) spot_count = len(data.drop(columns=['IF', 'OndemandPrice', 'Savings']).dropna()) if_count = len(data.drop(columns=['OndemandPrice', 'SpotPrice', 'Savings']).dropna()) - cw_client = boto3.client('logs') - - log_event = { + log_event = [{ 'timestamp': int(time_datetime.timestamp()) * 1000, 'message': f'AZUREONDEMAND: {ondemand_count} AZURESPOT: {spot_count} AZUREIF: {if_count}' - } - - cw_client.put_log_events( - logGroupName=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, - logStreamName=AZURE_CONST.LOG_STREAM_NAME, - logEvents=[log_event] + }] + CW.client.put_log_events( + log_group=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, + log_stream=AZURE_CONST.LOG_STREAM_NAME, + log_event=log_event ) +# Submit Batch To Timestream +def submit_batch_sps(records, counter, recursive): + if recursive == 10: + return + try: + common_attrs = {'MeasureName': 'azure_values','MeasureValueType': 'MULTI'} + TimestreamWrite.client.write_records( + DatabaseName='spotlake-test', + TableName='azure-sps-test', + Records=records, + CommonAttributes=common_attrs + ) + + except TimestreamWrite.client.exceptions.RejectedRecordsException as err: + print(err) + re_records = [] + for rr in err.response["RejectedRecords"]: + re_records.append(records[rr["RecordIndex"]]) + submit_batch_sps(re_records, counter, recursive + 1) + except Exception as err: + print(err) + + +# Check Database And Table Are Exist and Upload Data to Timestream +def upload_timestream_sps(data, time_datetime): + try: + data = data.copy() + data = data[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", + "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time"]] + + fill_values = { + "InstanceTier": 'NaN', + "InstanceType": 'NaN', + "Region": 'NaN', + 'OndemandPrice': -1, + 'Savings': -1, + 'SpotPrice': -1, + 'IF': -1, + 'Score': 'NaN', + 'AvailabilityZone': 'NaN', + 'DesiredCount': -2, + 'SPS_Update_Time': 'NaN' + } + data = data.fillna(fill_values) + + time_value = str(int(round(time_datetime.timestamp() * 1000))) + + records = [] + counter = 0 + for idx, row in data.iterrows(): + + dimensions = [] + for column in ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone']: + dimensions.append({'Name': column, 'Value': str(row[column])}) + + submit_data = { + 'Dimensions': dimensions, + 'MeasureValues': [], + 'Time': time_value + } + + measure_columns = [ + ('DesiredCount', 'DOUBLE'), + ('OndemandPrice', 'DOUBLE'), + ('SpotPrice', 'DOUBLE'), + ('IF', 'DOUBLE'), + ('Score', 'VARCHAR'), + ('SPS_Update_Time', 'VARCHAR') + ] + + for column, value_type in measure_columns: + submit_data['MeasureValues'].append({ + 'Name': column, + 'Value': str(row[column]), + 'Type': value_type + }) + + records.append(submit_data) + counter += 1 + if len(records) == 100: + submit_batch_sps(records, counter, 0) + records = [] + + if len(records) != 0: + submit_batch_sps(records, counter, 0) + return True + + except Exception as e: + print(f"upload_timestream_sps failed. error: {e}") + return False + def update_latest_sps(dataframe, availability_zones=True): try: + json_data = dataframe.to_dict(orient="records") + if availability_zones: - path = f"{AZURE_CONST.LATEST_SPS_FILENAME}" - else: - path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME}" + json_path = f"{AZURE_CONST.LATEST_SPS_FILENAME}" + pkl_gzip_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}" - json_data = dataframe.to_dict(orient="records") - S3.upload_file(json_data, path, "json", set_public_read=True) + S3.upload_file(json_data, json_path, "json", set_public_read=True) + S3.upload_file(dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) + + else: + json_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME}" + S3.upload_file(json_data, json_path, "json", set_public_read=True) return True except Exception as e: @@ -196,4 +280,53 @@ def save_raw_sps(dataframe, time_utc, availability_zones=True): except Exception as e: print(f"save_raw_sps failed. error: {e}") + return False + + +def query_selector_sps(data): + try: + prev_query_selector_data = S3.read_file(AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, 'json') + if prev_query_selector_data: + prev_selector_df = pd.DataFrame(prev_query_selector_data) + selector_df = pd.concat([ + prev_selector_df[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']], + data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']] + ], ignore_index=True).dropna().drop_duplicates().reset_index(drop=True) + else: + selector_df = data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']].dropna().drop_duplicates().reset_index(drop=True) + + S3.upload_file( + selector_df.to_dict(orient="records"), + AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, + 'json', + set_public_read=True + ) + return True + + except Exception as e: + print(f"query_selector_sps failed. error: {e}") + return False + + +def upload_cloudwatch_sps(data, time_datetime): + try: + ondemand_count = len(data.drop(columns=['IF', 'SpotPrice', 'Savings', 'Score']).dropna()) + spot_count = len(data.drop(columns=['IF', 'OndemandPrice', 'Savings', 'Score']).dropna()) + if_count = len(data.drop(columns=['OndemandPrice', 'SpotPrice', 'Savings', 'Score']).dropna()) + sps_count = len(data.drop(columns=['IF', 'OndemandPrice', 'SpotPrice', 'Savings']).dropna()) + + log_event = [{ + 'timestamp': int(time_datetime.timestamp()) * 1000, + 'message': f'AZUREONDEMAND: {ondemand_count} AZURESPOT: {spot_count} AZUREIF: {if_count} AZURESPS: {sps_count}' + }] + + CW.client.put_log( + log_group=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, + log_stream=AZURE_CONST.ALL_LOG_STREAM_NAME, + log_event=log_event + ) + return True + + except Exception as e: + print(f"upload_cloudwatch_sps failed. error: {e}") return False \ No newline at end of file diff --git a/const_config.py b/const_config.py index 0a1d48dd..84aaeb7d 100644 --- a/const_config.py +++ b/const_config.py @@ -99,12 +99,8 @@ def S3_QUERY_SELECTOR_SAVE_PATH(): return "query-selector/query-selector-azure.json" @constant - def DF_WORKLOAD_COLS(): - return ['InstanceTier', 'InstanceType', 'Region'] - - @constant - def DF_FEATURE_COLS(): - return ['OndemandPrice', 'SpotPrice', 'IF'] + def S3_QUERY_SELECTOR_ALL_SAVE_PATH(): + return "query-selector/query-selector-azure-all.json" @constant def SERVER_SAVE_DIR(): @@ -134,6 +130,10 @@ def SPOT_DATA_COLLECTION_LOG_GROUP_NAME(): def LOG_STREAM_NAME(): return "Azure-Count" + @constant + def ALL_LOG_STREAM_NAME(): + return "Azure-Count-All" + @constant def LOCATIONS_CALL_HISTORY_JSON_FILENAME(): return "sps-collector/azure/saved_variable/locations_call_history.json" @@ -166,6 +166,10 @@ def DF_TO_USE_TODAY_PKL_FILENAME(): def LATEST_SPS_FILENAME(): return "sps-collector/azure/result/latest_azure_sps_zone_true.json" + @constant + def LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME(): + return "sps-collector/azure/result/latest_azure_sps_zone_true.pkl.gz" + @constant def LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME(): return "sps-collector/azure/result/latest_azure_sps_zone_false.json" From 2e8d69d4c74ec68a4a87b7c6b0ef5cd0ce54a24e Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Sun, 23 Feb 2025 13:56:23 +0900 Subject: [PATCH 03/14] =?UTF-8?q?workflows=ED=86=B5=ED=95=B4=20lambda=5Ffu?= =?UTF-8?q?nction=5Fsps.py=20=EC=9D=84=20=EC=95=95=EC=B6=95=20=EC=8B=9C=20?= =?UTF-8?q?lambda=5Ffunction.py=20=EB=A1=9C=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/azure-sps-lambda-sync.yml | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/.github/workflows/azure-sps-lambda-sync.yml b/.github/workflows/azure-sps-lambda-sync.yml index ae9afc8f..5f1e6fb0 100644 --- a/.github/workflows/azure-sps-lambda-sync.yml +++ b/.github/workflows/azure-sps-lambda-sync.yml @@ -27,11 +27,16 @@ jobs: rm -f azure_sps_lambda.zip rm -f ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip - zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip \ - ./collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py \ - ./collector/spot-dataset/azure/lambda/current_collector/load_price.py \ - ./collector/spot-dataset/azure/lambda/current_collector/load_sps.py \ - ./const_config.py + mkdir -p /tmp/lambda_collector + + cp ./collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py /tmp/lambda_collector/lambda_function.py + cp ./collector/spot-dataset/azure/lambda/current_collector/load_price.py /tmp/lambda_collector/ + cp ./collector/spot-dataset/azure/lambda/current_collector/load_sps.py /tmp/lambda_collector/ + cp ./const_config.py /tmp/lambda_collector/ + + zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip /tmp/lambda_collector/* + + rm -rf /tmp/lambda_collector cd ./collector/spot-dataset/azure/lambda/current_collector/ zip -r azure_sps_lambda.zip ./utils/* From c90ccec13ea5e0d3c2849d093724375f051aaf4a Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Sun, 23 Feb 2025 13:56:46 +0900 Subject: [PATCH 04/14] =?UTF-8?q?-2=20=EC=98=A4=ED=83=80=EB=A5=BC=20-1?= =?UTF-8?q?=EB=A1=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../azure/lambda/current_collector/utils/upload_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index e85f75fd..ad9fba1d 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -190,9 +190,9 @@ def upload_timestream_sps(data, time_datetime): 'Savings': -1, 'SpotPrice': -1, 'IF': -1, + 'DesiredCount': -1, 'Score': 'NaN', 'AvailabilityZone': 'NaN', - 'DesiredCount': -2, 'SPS_Update_Time': 'NaN' } data = data.fillna(fill_values) From bebda41e2fa4a53bbecb0f8c7df8889711ee165a Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Sun, 23 Feb 2025 14:14:10 +0900 Subject: [PATCH 05/14] =?UTF-8?q?=EA=B8=B0=EC=A1=B4=20collector=20?= =?UTF-8?q?=EC=A4=91=201=EC=8B=9C=EA=B0=84=EC=97=90=20=ED=95=9C=20?= =?UTF-8?q?=EB=B2=88=20=EB=8F=8C=EB=A6=AC=EB=8A=94=20=EB=AA=A8=EB=93=88?= =?UTF-8?q?=EC=A4=91=20upload=5Fcloudwatch=20/=20query=5Fselector=20/=20up?= =?UTF-8?q?load=5Ftimestream=20/=20compare=20=EB=A5=BC=20=EC=82=AD?= =?UTF-8?q?=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function.py | 19 +- .../current_collector/lambda_function_sps.py | 8 +- .../current_collector/utils/compare_data.py | 81 -------- .../current_collector/utils/upload_data.py | 187 +++++------------- 4 files changed, 54 insertions(+), 241 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py index 8037469e..c7d12930 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py @@ -3,9 +3,8 @@ from load_if import load_if from load_price import collect_price_with_multithreading from utils.merge_df import merge_price_eviction_df -from utils.compare_data import compare -from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch -from utils.pub_service import send_slack_message, AZURE_CONST, S3 +from utils.upload_data import update_latest, save_raw +from utils.pub_service import send_slack_message def lambda_handler(event, _): event_time_utc = event.get("time") @@ -47,24 +46,10 @@ def lambda_handler(event, _): return try: - # load previous dataframe - previous_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz') - # upload latest azure price to s3 update_latest(join_df, event_time_utc_datetime) save_raw(join_df, event_time_utc_datetime) - # upload count-log to cloudwatch - upload_cloudwatch(join_df, event_time_utc_datetime) - - # compare and upload changed_df to timestream - workload_cols = ['InstanceTier', 'InstanceType', 'Region'] - feature_cols = ['OndemandPrice', 'SpotPrice', 'IF'] - changed_df = compare(previous_df, join_df, workload_cols, feature_cols) - if not changed_df.empty: - query_selector(changed_df) - upload_timestream(changed_df, event_time_utc_datetime) - except Exception as e: result_msg = """AZURE UPLOAD MODULE EXCEPTION!\n %s""" % (e) data = {'text': result_msg} diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index eeec2f88..532b2afb 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -4,7 +4,7 @@ from datetime import datetime from sps_module import sps_shared_resources from utils.merge_df import merge_price_eviction_sps_df -from utils.upload_data import update_latest_sps, save_raw_sps, upload_timestream_sps, query_selector_sps, upload_cloudwatch_sps +from utils.upload_data import update_latest_sps, save_raw_sps, upload_timestream, query_selector, upload_cloudwatch from utils.compare_data import compare_sps from utils.pub_service import send_slack_message, Logger, S3, AZURE_CONST @@ -98,11 +98,11 @@ def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): update_success = update_latest_sps(price_eviction_sps_zone_df, is_true_zone) save_success = save_raw_sps(price_eviction_sps_zone_df, time_datetime, is_true_zone) - cloudwatch_success = upload_cloudwatch_sps(price_eviction_sps_zone_df, time_datetime) + cloudwatch_success = upload_cloudwatch(price_eviction_sps_zone_df, time_datetime) if changed_df is not None and not changed_df.empty: - query_success = query_selector_sps(changed_df) - timestream_success = upload_timestream_sps(changed_df, time_datetime) + query_success = query_selector(changed_df) + timestream_success = upload_timestream(changed_df, time_datetime) else: query_success = True timestream_success = True diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py index 740c151b..147e3a27 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py @@ -3,87 +3,6 @@ # compare previous collected workload with current collected workload # return changed workload -def compare(previous_df, current_df, workload_cols, feature_cols): - previous_df = previous_df.copy() - current_df = current_df.copy() - - previous_df['OndemandPrice'] = previous_df['OndemandPrice'].fillna(-1) - current_df['OndemandPrice'] = current_df['OndemandPrice'].fillna(-1) - previous_df['Savings'] = previous_df['Savings'].fillna(-1) - current_df['Savings'] = current_df['Savings'].fillna(-1) - previous_df['IF'] = previous_df['IF'].fillna(-1) - current_df['IF'] = current_df['IF'].fillna(-1) - - previous_df = previous_df.dropna(axis=0) - current_df = current_df.dropna(axis=0) - - previous_df.loc[:, 'Workload'] = previous_df[workload_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1) - previous_df.loc[:, 'Feature'] = previous_df[feature_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1) - current_df.loc[:, 'Workload'] = current_df[workload_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1) - current_df.loc[:, 'Feature'] = current_df[feature_cols].apply(lambda row: ':'.join(row.values.astype(str)), axis=1) - - previous_df = previous_df.drop_duplicates(['Workload']) - current_df = current_df.drop_duplicates(['Workload']) - - current_indices = current_df[['Workload', 'Feature']].sort_values(by='Workload').index - current_values = current_df[['Workload', 'Feature']].sort_values(by='Workload').values - previous_indices = previous_df[['Workload', 'Feature']].sort_values(by='Workload').index - previous_values = previous_df[['Workload', 'Feature']].sort_values(by='Workload').values - - changed_indices = [] - - prev_idx = 0 - curr_idx = 0 - while True: - if (curr_idx == len(current_indices)) and (prev_idx == len(previous_indices)): - break - elif curr_idx == len(current_indices): - prev_workload = previous_values[prev_idx][0] - if prev_workload not in current_values[:, 0]: - prev_idx += 1 - continue - else: - print(f"{prev_workload}, {curr_workload} workload error") - raise Exception('workload error') - break - elif prev_idx == len(previous_indices): - curr_workload = current_values[curr_idx][0] - curr_feature = current_values[curr_idx][1] - if curr_workload not in previous_values[:, 0]: - changed_indices.append(current_indices[curr_idx]) - curr_idx += 1 - continue - else: - print(f"{prev_workload}, {curr_workload} workload error") - raise Exception('workload error') - break - - prev_workload = previous_values[prev_idx][0] - prev_feature = previous_values[prev_idx][1] - curr_workload = current_values[curr_idx][0] - curr_feature = current_values[curr_idx][1] - - if prev_workload != curr_workload: - if curr_workload not in previous_values[:, 0]: - changed_indices.append(current_indices[curr_idx]) - curr_idx += 1 - elif prev_workload not in current_values[:, 0]: - prev_idx += 1 - continue - else: - print(f"{prev_workload}, {curr_workload} workload error") - raise Exception('workload error') - else: - if prev_feature != curr_feature: - changed_indices.append(current_indices[curr_idx]) - curr_idx += 1 - prev_idx += 1 - - current_df = current_df.loc[changed_indices].drop(['Workload', 'Feature'], axis=1) - - return current_df - - def compare_sps(previous_df, current_df, workload_cols, feature_cols): previous_df = previous_df.copy() current_df = current_df.copy() diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index ad9fba1d..9f8018d6 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -10,65 +10,6 @@ session = boto3.session.Session(region_name='us-west-2') -# Submit Batch To Timestream -def submit_batch(records, counter, recursive): - if recursive == 10: - return - try: - result = TimestreamWrite.client.write_records(DatabaseName=STORAGE_CONST.BUCKET_NAME, TableName=STORAGE_CONST.AZURE_TABLE_NAME, Records=records,CommonAttributes={}) - - except TimestreamWrite.client.exceptions.RejectedRecordsException as err: - print(err) - re_records = [] - for rr in err.response["RejectedRecords"]: - re_records.append(records[rr["RecordIndex"]]) - submit_batch(re_records, counter, recursive + 1) - exit() - except Exception as err: - print(err) - exit() - - -# Check Database And Table Are Exist and Upload Data to Timestream -def upload_timestream(data, time_datetime): - data = data[['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'IF']] - data = data.copy() - - data['OndemandPrice'] = data['OndemandPrice'].fillna(-1) - data['SpotPrice'] = data['SpotPrice'].fillna(-1) - data['IF'] = data['IF'].fillna(-1) - - time_value = time.mktime(time_datetime.timetuple()) - time_value = str(int(round(time_value * 1000))) - - records = [] - counter = 0 - for idx, row in data.iterrows(): - - dimensions = [] - for column in ['InstanceTier', 'InstanceType', 'Region']: - dimensions.append({'Name': column, 'Value': str(row[column])}) - - submit_data = { - 'Dimensions': dimensions, - 'MeasureName': 'azure_values', - 'MeasureValues': [], - 'MeasureValueType': 'MULTI', - 'Time': time_value - } - - for column, types in [('OndemandPrice', 'DOUBLE'), ('SpotPrice', 'DOUBLE'), ('IF', 'DOUBLE')]: - submit_data['MeasureValues'].append({'Name': column, 'Value': str(row[column]), 'Type': types}) - - records.append(submit_data) - counter += 1 - if len(records) == 100: - submit_batch(records, counter, 0) - records = [] - - if len(records) != 0: - submit_batch(records, counter, 0) - # Update latest_azure.json in S3 def update_latest(data, time_datetime): @@ -122,38 +63,55 @@ def save_raw(data, time_datetime): s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, f"""rawdata/azure/{s3_dir_name}/{s3_obj_name}.csv.gz""") os.remove(f"{AZURE_CONST.SERVER_SAVE_DIR}/{time_str}.csv.gz") +def upload_cloudwatch(data, time_datetime): + try: + ondemand_count = len(data.drop(columns=['IF', 'SpotPrice', 'Savings', 'Score']).dropna()) + spot_count = len(data.drop(columns=['IF', 'OndemandPrice', 'Savings', 'Score']).dropna()) + if_count = len(data.drop(columns=['OndemandPrice', 'SpotPrice', 'Savings', 'Score']).dropna()) + sps_count = len(data.drop(columns=['IF', 'OndemandPrice', 'SpotPrice', 'Savings']).dropna()) + + log_event = [{ + 'timestamp': int(time_datetime.timestamp()) * 1000, + 'message': f'AZUREONDEMAND: {ondemand_count} AZURESPOT: {spot_count} AZUREIF: {if_count} AZURESPS: {sps_count}' + }] + + CW.client.put_log( + log_group=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, + log_stream=AZURE_CONST.LOG_STREAM_NAME, + log_event=log_event + ) + return True + + except Exception as e: + print(f"upload_cloudwatch_sps failed. error: {e}") + return False -# Update query-selector-azure.json in S3 def query_selector(data): - s3 = session.resource('s3') - prev_selector_df = pd.DataFrame(json.loads(s3.Object(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH).get()['Body'].read())) - selector_df = pd.concat([prev_selector_df[['InstanceTier', 'InstanceType', 'Region']], data[['InstanceTier', 'InstanceType', 'Region']]], axis=0, ignore_index=True).dropna().drop_duplicates(['InstanceTier', 'InstanceType', 'Region']).reset_index(drop=True) - result = selector_df.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.QUERY_SELECTOR_FILENAME}", orient='records') - s3 = session.client('s3') - with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.QUERY_SELECTOR_FILENAME}", "rb") as f: - s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH) - os.remove(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.QUERY_SELECTOR_FILENAME}") - s3 = session.resource('s3') - object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH) - response = object_acl.put(ACL='public-read') + try: + prev_query_selector_data = S3.read_file(AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, 'json') + if prev_query_selector_data: + prev_selector_df = pd.DataFrame(prev_query_selector_data) + selector_df = pd.concat([ + prev_selector_df[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']], + data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']] + ], ignore_index=True).dropna().drop_duplicates().reset_index(drop=True) + else: + selector_df = data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']].dropna().drop_duplicates().reset_index(drop=True) -def upload_cloudwatch(data, time_datetime): - ondemand_count = len(data.drop(columns=['IF', 'SpotPrice', 'Savings']).dropna()) - spot_count = len(data.drop(columns=['IF', 'OndemandPrice', 'Savings']).dropna()) - if_count = len(data.drop(columns=['OndemandPrice', 'SpotPrice', 'Savings']).dropna()) - - log_event = [{ - 'timestamp': int(time_datetime.timestamp()) * 1000, - 'message': f'AZUREONDEMAND: {ondemand_count} AZURESPOT: {spot_count} AZUREIF: {if_count}' - }] - CW.client.put_log_events( - log_group=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, - log_stream=AZURE_CONST.LOG_STREAM_NAME, - log_event=log_event - ) + S3.upload_file( + selector_df.to_dict(orient="records"), + AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, + 'json', + set_public_read=True + ) + return True + + except Exception as e: + print(f"query_selector_sps failed. error: {e}") + return False # Submit Batch To Timestream -def submit_batch_sps(records, counter, recursive): +def submit_batch(records, counter, recursive): if recursive == 10: return try: @@ -170,13 +128,13 @@ def submit_batch_sps(records, counter, recursive): re_records = [] for rr in err.response["RejectedRecords"]: re_records.append(records[rr["RecordIndex"]]) - submit_batch_sps(re_records, counter, recursive + 1) + submit_batch(re_records, counter, recursive + 1) except Exception as err: print(err) # Check Database And Table Are Exist and Upload Data to Timestream -def upload_timestream_sps(data, time_datetime): +def upload_timestream(data, time_datetime): try: data = data.copy() data = data[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", @@ -232,11 +190,11 @@ def upload_timestream_sps(data, time_datetime): records.append(submit_data) counter += 1 if len(records) == 100: - submit_batch_sps(records, counter, 0) + submit_batch(records, counter, 0) records = [] if len(records) != 0: - submit_batch_sps(records, counter, 0) + submit_batch(records, counter, 0) return True except Exception as e: @@ -280,53 +238,4 @@ def save_raw_sps(dataframe, time_utc, availability_zones=True): except Exception as e: print(f"save_raw_sps failed. error: {e}") - return False - - -def query_selector_sps(data): - try: - prev_query_selector_data = S3.read_file(AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, 'json') - if prev_query_selector_data: - prev_selector_df = pd.DataFrame(prev_query_selector_data) - selector_df = pd.concat([ - prev_selector_df[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']], - data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']] - ], ignore_index=True).dropna().drop_duplicates().reset_index(drop=True) - else: - selector_df = data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']].dropna().drop_duplicates().reset_index(drop=True) - - S3.upload_file( - selector_df.to_dict(orient="records"), - AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, - 'json', - set_public_read=True - ) - return True - - except Exception as e: - print(f"query_selector_sps failed. error: {e}") - return False - - -def upload_cloudwatch_sps(data, time_datetime): - try: - ondemand_count = len(data.drop(columns=['IF', 'SpotPrice', 'Savings', 'Score']).dropna()) - spot_count = len(data.drop(columns=['IF', 'OndemandPrice', 'Savings', 'Score']).dropna()) - if_count = len(data.drop(columns=['OndemandPrice', 'SpotPrice', 'Savings', 'Score']).dropna()) - sps_count = len(data.drop(columns=['IF', 'OndemandPrice', 'SpotPrice', 'Savings']).dropna()) - - log_event = [{ - 'timestamp': int(time_datetime.timestamp()) * 1000, - 'message': f'AZUREONDEMAND: {ondemand_count} AZURESPOT: {spot_count} AZUREIF: {if_count} AZURESPS: {sps_count}' - }] - - CW.client.put_log( - log_group=AZURE_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME, - log_stream=AZURE_CONST.ALL_LOG_STREAM_NAME, - log_event=log_event - ) - return True - - except Exception as e: - print(f"upload_cloudwatch_sps failed. error: {e}") return False \ No newline at end of file From 291bab71148ee782af1f80c0035b682b4deb1424 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Sun, 23 Feb 2025 14:22:26 +0900 Subject: [PATCH 06/14] =?UTF-8?q?=EB=B6=88=ED=95=84=EC=9A=94=20static=20?= =?UTF-8?q?=EB=B3=80=EC=88=98=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../azure/lambda/current_collector/utils/upload_data.py | 4 ++-- const_config.py | 8 -------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 9f8018d6..0d4b220e 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -88,7 +88,7 @@ def upload_cloudwatch(data, time_datetime): def query_selector(data): try: - prev_query_selector_data = S3.read_file(AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, 'json') + prev_query_selector_data = S3.read_file(AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH, 'json') if prev_query_selector_data: prev_selector_df = pd.DataFrame(prev_query_selector_data) selector_df = pd.concat([ @@ -100,7 +100,7 @@ def query_selector(data): S3.upload_file( selector_df.to_dict(orient="records"), - AZURE_CONST.S3_QUERY_SELECTOR_ALL_SAVE_PATH, + AZURE_CONST.S3_QUERY_SELECTOR_SAVE_PATH, 'json', set_public_read=True ) diff --git a/const_config.py b/const_config.py index 84aaeb7d..d35dbae4 100644 --- a/const_config.py +++ b/const_config.py @@ -98,10 +98,6 @@ def QUERY_SELECTOR_FILENAME(): def S3_QUERY_SELECTOR_SAVE_PATH(): return "query-selector/query-selector-azure.json" - @constant - def S3_QUERY_SELECTOR_ALL_SAVE_PATH(): - return "query-selector/query-selector-azure-all.json" - @constant def SERVER_SAVE_DIR(): return "/tmp" @@ -130,10 +126,6 @@ def SPOT_DATA_COLLECTION_LOG_GROUP_NAME(): def LOG_STREAM_NAME(): return "Azure-Count" - @constant - def ALL_LOG_STREAM_NAME(): - return "Azure-Count-All" - @constant def LOCATIONS_CALL_HISTORY_JSON_FILENAME(): return "sps-collector/azure/saved_variable/locations_call_history.json" From 476481cd22c4f97e353f18a0df6bdfca07570e50 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Sun, 23 Feb 2025 18:20:19 +0900 Subject: [PATCH 07/14] =?UTF-8?q?=EA=B2=B0=EC=B8=A1=EC=B9=98=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=ED=86=B5=EC=9D=BC,=20=20json=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EC=97=85=EB=8F=84=EB=93=9C=20=EC=8B=9C,=20indent=3D4=20?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../azure/lambda/current_collector/utils/compare_data.py | 6 +++--- .../azure/lambda/current_collector/utils/pub_service.py | 2 +- .../azure/lambda/current_collector/utils/upload_data.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py index 147e3a27..8cf2ce9c 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py @@ -11,10 +11,10 @@ def compare_sps(previous_df, current_df, workload_cols, feature_cols): 'OndemandPrice': -1, 'Savings': -1, 'IF': -1, - 'Score': -1, - 'AvailabilityZone': -1, 'DesiredCount': -1, - 'SPS_Update_Time': -1 + 'AvailabilityZone': 'NaN', + 'Score': 'NaN', + 'SPS_Update_Time': 'NaN' } previous_df = previous_df.fillna(fill_values) current_df = current_df.fillna(fill_values) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py index c6dbd45c..19578bd0 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/pub_service.py @@ -58,7 +58,7 @@ def upload_file(self, data, file_path, file_type="json", set_public_read = False if file_type == "json": if not isinstance(data, (dict, list)): raise ValueError("JSON must be a dictionary or a list") - file = io.BytesIO(json.dumps(data, indent=4).encode("utf-8")) + file = io.BytesIO(json.dumps(data).encode("utf-8")) elif file_type == "pkl": if data is None: diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 0d4b220e..d74f9f42 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -149,8 +149,8 @@ def upload_timestream(data, time_datetime): 'SpotPrice': -1, 'IF': -1, 'DesiredCount': -1, - 'Score': 'NaN', 'AvailabilityZone': 'NaN', + 'Score': 'NaN', 'SPS_Update_Time': 'NaN' } data = data.fillna(fill_values) From cc30ba06e2aa88ddf27cc0ea0929fb005b1dc7ea Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Mon, 24 Feb 2025 09:14:23 +0900 Subject: [PATCH 08/14] =?UTF-8?q?PriceEviction=5FUpdate=5FTime=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80,=20=EA=B2=B0=EC=B8=A1=EC=B9=98=20=EC=B2=98=EB=A6=AC.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function_sps.py | 3 ++- .../current_collector/utils/compare_data.py | 7 ++++--- .../current_collector/utils/merge_df.py | 19 +++++++++++++++++-- .../current_collector/utils/upload_data.py | 16 +++++++++------- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index 532b2afb..a455827c 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -89,8 +89,9 @@ def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): if is_true_zone: price_eviction_sps_zone_previous_df = S3.read_file(f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}", 'pkl.gz') + price_eviction_sps_zone_previous_df.drop(columns=['id'], inplace=True) workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount'] - feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time'] + feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time', 'PriceEviction_Update_Time'] changed_df = None if price_eviction_sps_zone_previous_df is not None and not price_eviction_sps_zone_previous_df.empty: diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py index 8cf2ce9c..8368c152 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py @@ -12,9 +12,10 @@ def compare_sps(previous_df, current_df, workload_cols, feature_cols): 'Savings': -1, 'IF': -1, 'DesiredCount': -1, - 'AvailabilityZone': 'NaN', - 'Score': 'NaN', - 'SPS_Update_Time': 'NaN' + 'AvailabilityZone': 'N/A', + 'Score': 'N/A', + 'SPS_Update_Time': 'N/A', + 'PriceEviction_Update_Time': 'N/A', } previous_df = previous_df.fillna(fill_values) current_df = current_df.fillna(fill_values) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py index 98d6d44e..75a5818b 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py @@ -19,12 +19,27 @@ def merge_price_eviction_sps_df(price_eviction_df, sps_df, availability_zones=Tr join_df.rename(columns={'time_x': 'PriceEviction_Update_Time', 'time_y': 'SPS_Update_Time'}, inplace=True) join_df.drop(columns=['id', 'InstanceTypeSPS', 'RegionCodeSPS'], inplace=True) - columns = ["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", - "PriceEviction_Update_Time", "DesiredCount", "Score", "SPS_Update_Time"] + columns = ["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", "PriceEviction_Update_Time", + "DesiredCount", "Score", "SPS_Update_Time"] if availability_zones: columns.insert(-2, "AvailabilityZone") # "Score" 앞에 삽입 join_df = join_df[columns] + join_df.fillna({ + "InstanceTier": "N/A", + "InstanceType": "N/A", + "Region": "N/A", + "OndemandPrice": -1, + "SpotPrice": -1, + "Savings": -1, + "IF": -1, + "DesiredCount": -1, + "Score": "N/A", + "AvailabilityZone": "N/A", + "SPS_Update_Time": "N/A", + "PriceEviction_Update_Time": "N/A" + }, inplace=True) + return join_df \ No newline at end of file diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index d74f9f42..bc6e6b9e 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -138,20 +138,21 @@ def upload_timestream(data, time_datetime): try: data = data.copy() data = data[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", - "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time"]] + "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time", "PriceEviction_Update_Time"]] fill_values = { - "InstanceTier": 'NaN', - "InstanceType": 'NaN', - "Region": 'NaN', + "InstanceTier": 'N/A', + "InstanceType": 'N/A', + "Region": 'N/A', 'OndemandPrice': -1, 'Savings': -1, 'SpotPrice': -1, 'IF': -1, 'DesiredCount': -1, - 'AvailabilityZone': 'NaN', - 'Score': 'NaN', - 'SPS_Update_Time': 'NaN' + 'AvailabilityZone': 'N/A', + 'Score': 'N/A', + 'SPS_Update_Time': 'N/A', + "PriceEviction_Update_Time": 'N/A' } data = data.fillna(fill_values) @@ -204,6 +205,7 @@ def upload_timestream(data, time_datetime): def update_latest_sps(dataframe, availability_zones=True): try: + dataframe['id'] = dataframe.index + 1 json_data = dataframe.to_dict(orient="records") if availability_zones: From a14d486242cc3541099a7332d67307776584ffde Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Tue, 25 Feb 2025 00:34:27 +0900 Subject: [PATCH 09/14] =?UTF-8?q?AvailabilityZone=20=EA=B2=B0=EC=B8=A1?= =?UTF-8?q?=EC=B9=98=20=EC=B2=98=EB=A6=AC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spot-dataset/azure/lambda/current_collector/load_sps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py index 86aa812d..6a2b84b1 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py @@ -185,7 +185,7 @@ def execute_spot_placement_score_task_by_parameter_pool_df(api_calls_df, availab "Score": score.get("score") } if availability_zones: - score_data["AvailabilityZone"] = score.get("availabilityZone", "SINGLE_ZONE") + score_data["AvailabilityZone"] = score.get("availabilityZone", "Single") results.append(score_data) From 2e8fd43a148e5f90ad98e071bd560cf828289af6 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Tue, 25 Feb 2025 13:13:25 +0900 Subject: [PATCH 10/14] =?UTF-8?q?1.=20PriceEviction=5FUpdate=5FTime?= =?UTF-8?q?=EC=82=AD=EC=A0=9C,=202.=20json=20=EC=97=85=EB=A1=9C=EB=93=9C?= =?UTF-8?q?=20=EC=8B=9C,=20FE=EC=9A=A9=20"DesiredCount"].isin([1,=20-1])?= =?UTF-8?q?=20=EC=9D=98=20=EA=B2=B0=EA=B3=BC=EB=A5=BC=20=EB=94=B0=EB=A1=9C?= =?UTF-8?q?=20=EC=97=85=EB=A1=9C=EB=93=9C,=20=ED=95=98=EC=A7=80=EB=A7=8C?= =?UTF-8?q?=20rawdata=EB=8A=94=20=EC=9A=B0=EC=84=A0=20full=5Fdata=EB=A7=8C?= =?UTF-8?q?=20=EC=98=AC=EB=A6=AC=EA=B3=A0=20=EC=9E=88=EC=9D=8C.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function_sps.py | 2 +- .../current_collector/utils/compare_data.py | 3 +- .../current_collector/utils/merge_df.py | 5 ++- .../current_collector/utils/upload_data.py | 35 ++++++++++++------- const_config.py | 4 +++ 5 files changed, 31 insertions(+), 18 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index a455827c..904cd4dc 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -91,7 +91,7 @@ def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): price_eviction_sps_zone_previous_df = S3.read_file(f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}", 'pkl.gz') price_eviction_sps_zone_previous_df.drop(columns=['id'], inplace=True) workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount'] - feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time', 'PriceEviction_Update_Time'] + feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time'] changed_df = None if price_eviction_sps_zone_previous_df is not None and not price_eviction_sps_zone_previous_df.empty: diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py index 8368c152..f66b8369 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/compare_data.py @@ -14,8 +14,7 @@ def compare_sps(previous_df, current_df, workload_cols, feature_cols): 'DesiredCount': -1, 'AvailabilityZone': 'N/A', 'Score': 'N/A', - 'SPS_Update_Time': 'N/A', - 'PriceEviction_Update_Time': 'N/A', + 'SPS_Update_Time': 'N/A' } previous_df = previous_df.fillna(fill_values) current_df = current_df.fillna(fill_values) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py index 75a5818b..4f6a0894 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py @@ -19,7 +19,7 @@ def merge_price_eviction_sps_df(price_eviction_df, sps_df, availability_zones=Tr join_df.rename(columns={'time_x': 'PriceEviction_Update_Time', 'time_y': 'SPS_Update_Time'}, inplace=True) join_df.drop(columns=['id', 'InstanceTypeSPS', 'RegionCodeSPS'], inplace=True) - columns = ["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", "PriceEviction_Update_Time", + columns = ["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", "DesiredCount", "Score", "SPS_Update_Time"] if availability_zones: @@ -38,8 +38,7 @@ def merge_price_eviction_sps_df(price_eviction_df, sps_df, availability_zones=Tr "DesiredCount": -1, "Score": "N/A", "AvailabilityZone": "N/A", - "SPS_Update_Time": "N/A", - "PriceEviction_Update_Time": "N/A" + "SPS_Update_Time": "N/A" }, inplace=True) return join_df \ No newline at end of file diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index bc6e6b9e..37267e13 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -138,7 +138,7 @@ def upload_timestream(data, time_datetime): try: data = data.copy() data = data[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF", - "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time", "PriceEviction_Update_Time"]] + "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time"]] fill_values = { "InstanceTier": 'N/A', @@ -151,8 +151,7 @@ def upload_timestream(data, time_datetime): 'DesiredCount': -1, 'AvailabilityZone': 'N/A', 'Score': 'N/A', - 'SPS_Update_Time': 'N/A', - "PriceEviction_Update_Time": 'N/A' + 'SPS_Update_Time': 'N/A' } data = data.fillna(fill_values) @@ -203,21 +202,32 @@ def upload_timestream(data, time_datetime): return False -def update_latest_sps(dataframe, availability_zones=True): +def update_latest_sps(full_dataframe, availability_zones=True): try: - dataframe['id'] = dataframe.index + 1 - json_data = dataframe.to_dict(orient="records") + full_dataframe['id'] = full_dataframe.index + 1 + + dataframe_desired_count_1_df = full_dataframe[full_dataframe["DesiredCount"].isin([1, -1])].copy() + dataframe_desired_count_1_df['id'] = dataframe_desired_count_1_df.index + 1 + + full_json_data = full_dataframe.to_dict(orient="records") + desired_count_1_json_data = dataframe_desired_count_1_df.to_dict(orient="records") if availability_zones: - json_path = f"{AZURE_CONST.LATEST_SPS_FILENAME}" + full_json_path = f"{AZURE_CONST.LATEST_SPS_FILENAME}" + desired_count_1_json_path = f"{AZURE_CONST.LATEST_SPS_DESIRED_COUNT_1_FILENAME}" pkl_gzip_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}" - S3.upload_file(json_data, json_path, "json", set_public_read=True) - S3.upload_file(dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) + + # FE 노출용 json, ["DesiredCount"].isin([1, -1]) + S3.upload_file(desired_count_1_json_data, desired_count_1_json_path, "json", set_public_read=True) + # Full data json + S3.upload_file(full_json_data, full_json_path, "json", set_public_read=True) + # Full data pkl.gz, data 비교용 + S3.upload_file(full_dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) else: json_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME}" - S3.upload_file(json_data, json_path, "json", set_public_read=True) + S3.upload_file(full_json_data, json_path, "json", set_public_read=True) return True except Exception as e: @@ -225,7 +235,7 @@ def update_latest_sps(dataframe, availability_zones=True): return False -def save_raw_sps(dataframe, time_utc, availability_zones=True): +def save_raw_sps(full_dataframe, time_utc, availability_zones=True): try: s3_dir_name = time_utc.strftime("%Y/%m/%d") s3_obj_name = time_utc.strftime("%H-%M-%S") @@ -235,7 +245,8 @@ def save_raw_sps(dataframe, time_utc, availability_zones=True): else: path = f"{AZURE_CONST.LATEST_SPS_RAW_DATA_PATH}/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" - S3.upload_file(dataframe, path, "df_to_csv.gz", set_public_read=True) + # Full data df_to_csv.gz, data 분석용 + S3.upload_file(full_dataframe, path, "df_to_csv.gz", set_public_read=True) return True except Exception as e: diff --git a/const_config.py b/const_config.py index d35dbae4..c6a180b0 100644 --- a/const_config.py +++ b/const_config.py @@ -158,6 +158,10 @@ def DF_TO_USE_TODAY_PKL_FILENAME(): def LATEST_SPS_FILENAME(): return "sps-collector/azure/result/latest_azure_sps_zone_true.json" + @constant + def LATEST_SPS_DESIRED_COUNT_1_FILENAME(): + return "sps-collector/azure/result/latest_azure_sps_zone_true_desired_count_1.json" + @constant def LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME(): return "sps-collector/azure/result/latest_azure_sps_zone_true.pkl.gz" From 0d33cf4813de7d6724647937e7588d372791f910 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Tue, 25 Feb 2025 20:24:49 +0900 Subject: [PATCH 11/14] =?UTF-8?q?=ED=8C=8C=EC=9D=BC=20=EA=B2=BD=EB=A1=9C?= =?UTF-8?q?=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function.py | 20 ++--- .../current_collector/lambda_function_sps.py | 34 ++++---- .../current_collector/utils/merge_df.py | 8 +- .../current_collector/utils/upload_data.py | 81 ++++++++++--------- const_config.py | 37 ++++----- 5 files changed, 93 insertions(+), 87 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py index c7d12930..f8d11300 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function.py @@ -2,8 +2,8 @@ from datetime import datetime from load_if import load_if from load_price import collect_price_with_multithreading -from utils.merge_df import merge_price_eviction_df -from utils.upload_data import update_latest, save_raw +from utils.merge_df import merge_price_saving_if_df +from utils.upload_data import update_latest_price_saving_if, save_raw_price_saving_if from utils.pub_service import send_slack_message def lambda_handler(event, _): @@ -15,7 +15,7 @@ def lambda_handler(event, _): # collect azure price data with multithreading try: - current_df = collect_price_with_multithreading() + price_saving_df = collect_price_with_multithreading() except Exception as e: result_msg = """AZURE PRICE MODULE EXCEPTION!\n %s""" % (e) data = {'text': result_msg} @@ -23,7 +23,7 @@ def lambda_handler(event, _): is_price_fetch_success = False try: - eviction_df = load_if() + if_df = load_if() except Exception as e: result_msg = """AZURE IF MODULE EXCEPTION!\n %s""" % (e) data = {'text': result_msg} @@ -31,12 +31,12 @@ def lambda_handler(event, _): is_if_fetch_success = False if is_price_fetch_success and is_if_fetch_success: - join_df = merge_price_eviction_df(current_df, eviction_df) + join_df = merge_price_saving_if_df(price_saving_df, if_df) elif not is_price_fetch_success and is_if_fetch_success: - join_df = eviction_df + join_df = if_df elif is_price_fetch_success and not is_if_fetch_success: - current_df['IF'] = -1.0 - current_df = current_df[ + price_saving_df['IF'] = -1.0 + current_df = price_saving_df[ ['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']] join_df = current_df else: @@ -47,8 +47,8 @@ def lambda_handler(event, _): try: # upload latest azure price to s3 - update_latest(join_df, event_time_utc_datetime) - save_raw(join_df, event_time_utc_datetime) + update_latest_price_saving_if(join_df, event_time_utc_datetime) + save_raw_price_saving_if(join_df, event_time_utc_datetime) except Exception as e: result_msg = """AZURE UPLOAD MODULE EXCEPTION!\n %s""" % (e) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index 904cd4dc..a6c2af80 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -3,8 +3,8 @@ import traceback from datetime import datetime from sps_module import sps_shared_resources -from utils.merge_df import merge_price_eviction_sps_df -from utils.upload_data import update_latest_sps, save_raw_sps, upload_timestream, query_selector, upload_cloudwatch +from utils.merge_df import merge_if_saving_price_sps_df +from utils.upload_data import update_latest, save_raw, upload_timestream, query_selector, upload_cloudwatch from utils.compare_data import compare_sps from utils.pub_service import send_slack_message, Logger, S3, AZURE_CONST @@ -64,12 +64,12 @@ def handle_res_df(sps_res_true_df, sps_res_false_df, time_datetime): sps_res_true_df['AvailabilityZone'] = sps_res_true_df['AvailabilityZone'].where(pd.notna(sps_res_true_df['AvailabilityZone']), None) - price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz') - if price_if_df is None: + price_saving_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH, 'pkl.gz') + if price_saving_if_df is None: raise ValueError("price_if_df is None") - success_availability_zone_true = process_zone_data(price_if_df, sps_res_true_df, time_datetime, True) - success_availability_zone_false = process_zone_data(price_if_df, sps_res_false_df, time_datetime, False) + success_availability_zone_true = process_zone_data(price_saving_if_df, sps_res_true_df, time_datetime, True) + success_availability_zone_false = process_zone_data(price_saving_if_df, sps_res_false_df, time_datetime, False) if success_availability_zone_true and success_availability_zone_false: Logger.info("Successfully merged the price/if/sps df, process_zone_data!") @@ -83,23 +83,23 @@ def handle_res_df(sps_res_true_df, sps_res_false_df, time_datetime): return False -def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): +def process_zone_data(price_saving_if_df, sps_res_df, time_datetime, is_true_zone): try: - price_eviction_sps_zone_df = merge_price_eviction_sps_df(price_if_df, sps_res_df, is_true_zone) + price_saving_if_sps_zone_df = merge_if_saving_price_sps_df(price_saving_if_df, sps_res_df, is_true_zone) if is_true_zone: - price_eviction_sps_zone_previous_df = S3.read_file(f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}", 'pkl.gz') - price_eviction_sps_zone_previous_df.drop(columns=['id'], inplace=True) + availability_zone_true_all_data_prev_df = S3.read_file(f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}", 'pkl.gz') + availability_zone_true_all_data_prev_df.drop(columns=['id'], inplace=True) workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount'] feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time'] changed_df = None - if price_eviction_sps_zone_previous_df is not None and not price_eviction_sps_zone_previous_df.empty: - changed_df = compare_sps(price_eviction_sps_zone_previous_df, price_eviction_sps_zone_df, workload_cols, feature_cols) + if availability_zone_true_all_data_prev_df is not None and not availability_zone_true_all_data_prev_df.empty: + changed_df = compare_sps(availability_zone_true_all_data_prev_df, price_saving_if_sps_zone_df, workload_cols, feature_cols) - update_success = update_latest_sps(price_eviction_sps_zone_df, is_true_zone) - save_success = save_raw_sps(price_eviction_sps_zone_df, time_datetime, is_true_zone) - cloudwatch_success = upload_cloudwatch(price_eviction_sps_zone_df, time_datetime) + update_success = update_latest(price_saving_if_sps_zone_df, is_true_zone) + save_success = save_raw(price_saving_if_sps_zone_df, time_datetime, is_true_zone) + cloudwatch_success = upload_cloudwatch(price_saving_if_sps_zone_df, time_datetime) if changed_df is not None and not changed_df.empty: query_success = query_selector(changed_df) @@ -115,8 +115,8 @@ def process_zone_data(price_if_df, sps_res_df, time_datetime, is_true_zone): f"query: {query_success}, timestream: {timestream_success}" ) else: - update_success = update_latest_sps(price_eviction_sps_zone_df, is_true_zone) - save_success = save_raw_sps(price_eviction_sps_zone_df, time_datetime, is_true_zone) + update_success = update_latest(price_saving_if_sps_zone_df, is_true_zone) + save_success = save_raw(price_saving_if_sps_zone_df, time_datetime, is_true_zone) success = update_success and save_success log_details = f"update: {update_success}, save: {save_success}" diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py index 4f6a0894..4bb3f983 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/merge_df.py @@ -1,8 +1,8 @@ import pandas as pd import numpy as np -def merge_price_eviction_df(price_df, eviction_df): - join_df = pd.merge(price_df, eviction_df, +def merge_price_saving_if_df(price_df, if_df): + join_df = pd.merge(price_df, if_df, left_on=['InstanceType', 'InstanceTier', 'armRegionName'], right_on=['InstanceType', 'InstanceTier', 'Region'], how='outer') @@ -14,8 +14,8 @@ def merge_price_eviction_df(price_df, eviction_df): return join_df -def merge_price_eviction_sps_df(price_eviction_df, sps_df, availability_zones=True): - join_df = pd.merge(price_eviction_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer') +def merge_if_saving_price_sps_df(price_saving_if_df, sps_df, availability_zones=True): + join_df = pd.merge(price_saving_if_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer') join_df.rename(columns={'time_x': 'PriceEviction_Update_Time', 'time_y': 'SPS_Update_Time'}, inplace=True) join_df.drop(columns=['id', 'InstanceTypeSPS', 'RegionCodeSPS'], inplace=True) diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 37267e13..0dabaa6e 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -10,9 +10,8 @@ session = boto3.session.Session(region_name='us-west-2') - # Update latest_azure.json in S3 -def update_latest(data, time_datetime): +def update_latest_price_saving_if(data, time_datetime): data['id'] = data.index + 1 data = data[['id', 'InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']] data = data.copy() @@ -23,30 +22,30 @@ def update_latest(data, time_datetime): data['time'] = datetime.strftime(time_datetime, '%Y-%m-%d %H:%M:%S') - data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", orient='records') - data.to_pickle(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", compression="gzip") + data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_SAVING_IF_FILENAME}", orient='records') + data.to_pickle(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_SAVING_IF_PKL_GZIP_FILENAME}", compression="gzip") session = boto3.Session() s3 = session.client('s3') - with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", 'rb') as f: - s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH) + with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_SAVING_IF_FILENAME}", 'rb') as f: + s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_DATA_SAVE_PATH) - with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", 'rb') as f: - s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH) + with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_SAVING_IF_PKL_GZIP_FILENAME}", 'rb') as f: + s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH) s3 = boto3.resource('s3') - object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH) + object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_DATA_SAVE_PATH) response = object_acl.put(ACL='public-read') - object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH) + object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH) response = object_acl.put(ACL='public-read') pickle.dump(data, open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.SERVER_SAVE_FILENAME}", "wb")) # Save raw data in S3 -def save_raw(data, time_datetime): +def save_raw_price_saving_if(data, time_datetime): data['Time'] = time_datetime.strftime("%Y-%m-%d %H:%M:%S") time_str = datetime.strftime(time_datetime, '%Y-%m-%d_%H-%M-%S') data = data[['Time','InstanceTier','InstanceType', 'Region', 'OndemandPrice','SpotPrice', 'IF', 'Savings']] @@ -60,7 +59,7 @@ def save_raw(data, time_datetime): s3_obj_name = time_datetime.strftime("%H-%M-%S") with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{time_str}.csv.gz", 'rb') as f: - s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, f"""rawdata/azure/{s3_dir_name}/{s3_obj_name}.csv.gz""") + s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, f"""{AZURE_CONST.S3_RAW_DATA_PATH}/if_saving_price/{s3_dir_name}/{s3_obj_name}.csv.gz""") os.remove(f"{AZURE_CONST.SERVER_SAVE_DIR}/{time_str}.csv.gz") def upload_cloudwatch(data, time_datetime): @@ -83,7 +82,7 @@ def upload_cloudwatch(data, time_datetime): return True except Exception as e: - print(f"upload_cloudwatch_sps failed. error: {e}") + print(f"upload_cloudwatch failed. error: {e}") return False def query_selector(data): @@ -92,11 +91,11 @@ def query_selector(data): if prev_query_selector_data: prev_selector_df = pd.DataFrame(prev_query_selector_data) selector_df = pd.concat([ - prev_selector_df[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']], - data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']] + prev_selector_df[['InstanceTier', 'InstanceType', 'Region']], + data[['InstanceTier', 'InstanceType', 'Region']] ], ignore_index=True).dropna().drop_duplicates().reset_index(drop=True) else: - selector_df = data[['InstanceTier', 'InstanceType', 'Region', 'DesiredCount']].dropna().drop_duplicates().reset_index(drop=True) + selector_df = data[['InstanceTier', 'InstanceType', 'Region']].dropna().drop_duplicates().reset_index(drop=True) S3.upload_file( selector_df.to_dict(orient="records"), @@ -107,7 +106,7 @@ def query_selector(data): return True except Exception as e: - print(f"query_selector_sps failed. error: {e}") + print(f"query_selector failed. error: {e}") return False # Submit Batch To Timestream @@ -117,8 +116,8 @@ def submit_batch(records, counter, recursive): try: common_attrs = {'MeasureName': 'azure_values','MeasureValueType': 'MULTI'} TimestreamWrite.client.write_records( - DatabaseName='spotlake-test', - TableName='azure-sps-test', + DatabaseName='spotlake', + TableName='azure', Records=records, CommonAttributes=common_attrs ) @@ -198,57 +197,63 @@ def upload_timestream(data, time_datetime): return True except Exception as e: - print(f"upload_timestream_sps failed. error: {e}") + print(f"upload_timestream failed. error: {e}") return False -def update_latest_sps(full_dataframe, availability_zones=True): +def update_latest(all_data_dataframe, availability_zones=True): try: - full_dataframe['id'] = full_dataframe.index + 1 + all_data_dataframe['id'] = all_data_dataframe.index + 1 - dataframe_desired_count_1_df = full_dataframe[full_dataframe["DesiredCount"].isin([1, -1])].copy() + dataframe_desired_count_1_df = all_data_dataframe[all_data_dataframe["DesiredCount"].isin([1, -1])].copy() dataframe_desired_count_1_df['id'] = dataframe_desired_count_1_df.index + 1 - full_json_data = full_dataframe.to_dict(orient="records") + full_json_data = all_data_dataframe.to_dict(orient="records") desired_count_1_json_data = dataframe_desired_count_1_df.to_dict(orient="records") if availability_zones: - full_json_path = f"{AZURE_CONST.LATEST_SPS_FILENAME}" - desired_count_1_json_path = f"{AZURE_CONST.LATEST_SPS_DESIRED_COUNT_1_FILENAME}" - pkl_gzip_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME}" + full_json_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_SAVE_PATH}" + desired_count_1_json_path = f"{AZURE_CONST.S3_LATEST_DESIRED_COUNT_1_DATA_AVAILABILITYZONE_TRUE_SAVE_PATH}" + pkl_gzip_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}" - # FE 노출용 json, ["DesiredCount"].isin([1, -1]) - S3.upload_file(desired_count_1_json_data, desired_count_1_json_path, "json", set_public_read=True) # Full data json S3.upload_file(full_json_data, full_json_path, "json", set_public_read=True) + # FE 노출용 json, ["DesiredCount"].isin([1, -1]) + S3.upload_file(desired_count_1_json_data, desired_count_1_json_path, "json", set_public_read=True) # Full data pkl.gz, data 비교용 - S3.upload_file(full_dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) + S3.upload_file(all_data_dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) else: - json_path = f"{AZURE_CONST.LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME}" + json_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_FALSE_SAVE_PATH}" S3.upload_file(full_json_data, json_path, "json", set_public_read=True) return True except Exception as e: - print(f"update_latest_sps failed. error: {e}") + print(f"update_latest failed. error: {e}") return False -def save_raw_sps(full_dataframe, time_utc, availability_zones=True): +def save_raw(all_data_dataframe, time_utc, availability_zones=True): try: + dataframe_desired_count_1_df = all_data_dataframe[all_data_dataframe["DesiredCount"].isin([1, -1])].copy() + s3_dir_name = time_utc.strftime("%Y/%m/%d") s3_obj_name = time_utc.strftime("%H-%M-%S") if availability_zones: - path = f"{AZURE_CONST.LATEST_SPS_RAW_DATA_PATH}/availability-zones-true/{s3_dir_name}/{s3_obj_name}.csv.gz" + all_data_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/all_data/availability-zones-true/{s3_dir_name}/{s3_obj_name}.csv.gz" + # 기존 경로 유지 + desired_count_1_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/{s3_dir_name}/{s3_obj_name}.csv.gz" else: - path = f"{AZURE_CONST.LATEST_SPS_RAW_DATA_PATH}/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" + all_data_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/all_data/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" + desired_count_1_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" - # Full data df_to_csv.gz, data 분석용 - S3.upload_file(full_dataframe, path, "df_to_csv.gz", set_public_read=True) + # data 분석용 + S3.upload_file(all_data_dataframe, all_data_path, "df_to_csv.gz", set_public_read=True) + S3.upload_file(dataframe_desired_count_1_df, desired_count_1_path, "df_to_csv.gz", set_public_read=True) return True except Exception as e: - print(f"save_raw_sps failed. error: {e}") + print(f"save_raw failed. error: {e}") return False \ No newline at end of file diff --git a/const_config.py b/const_config.py index c6a180b0..9507c910 100644 --- a/const_config.py +++ b/const_config.py @@ -75,20 +75,21 @@ def SPEC_RESOURCE_SETS_LIMIT(): return 2000 @constant - def LATEST_FILENAME(): - return "latest_azure.json" + def LATEST_PRICE_SAVING_IF_FILENAME(): + return "latest_price_saving_if.json" @constant - def LATEST_PRICE_IF_PKL_GZIP_FILENAME(): - return "latest_price_if_azure.pkl.gz" + def LATEST_PRICE_SAVING_IF_PKL_GZIP_FILENAME(): + return "latest_price_saving_if.pkl.gz" @constant - def S3_LATEST_PRICE_IF_GZIP_SAVE_PATH(): - return "latest_data/latest_price_if_azure.pkl.gz" + def S3_LATEST_PRICE_SAVING_IF_DATA_SAVE_PATH(): + return "latest_data/latest_price_saving_if.json" @constant - def S3_LATEST_DATA_SAVE_PATH(): - return "latest_data/latest_azure.json" + def S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH(): + return "latest_data/latest_price_saving_if.pkl.gz" + @constant def QUERY_SELECTOR_FILENAME(): @@ -155,24 +156,24 @@ def DF_TO_USE_TODAY_PKL_FILENAME(): return "sps-collector/azure/df_to_use_today.pkl" @constant - def LATEST_SPS_FILENAME(): - return "sps-collector/azure/result/latest_azure_sps_zone_true.json" + def S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_SAVE_PATH(): + return "latest_data/latest_all_data_zone_true.json" @constant - def LATEST_SPS_DESIRED_COUNT_1_FILENAME(): - return "sps-collector/azure/result/latest_azure_sps_zone_true_desired_count_1.json" + def S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_FALSE_SAVE_PATH(): + return "latest_data/latest_all_data_zone_false.json" @constant - def LATEST_SPS_AVAILABILITY_ZONE_TRUE_PKL_GZIP_FILENAME(): - return "sps-collector/azure/result/latest_azure_sps_zone_true.pkl.gz" + def S3_LATEST_DESIRED_COUNT_1_DATA_AVAILABILITYZONE_TRUE_SAVE_PATH(): + return "latest_data/latest_azure.json" @constant - def LATEST_SPS_AVAILABILITY_ZONE_FALSE_FILENAME(): - return "sps-collector/azure/result/latest_azure_sps_zone_false.json" + def S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH(): + return "latest_data/latest_sps_zone_true_azure.pkl.gz" @constant - def LATEST_SPS_RAW_DATA_PATH(): - return "sps-collector/azure/result/rawdata" + def S3_RAW_DATA_PATH(): + return "rawdata/azure" class GcpCollector(object): @constant From 82e0ac59f771cb74120fb32082b790debe91b6bc Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Wed, 26 Feb 2025 01:26:01 +0900 Subject: [PATCH 12/14] =?UTF-8?q?=EB=A1=9C=EA=B9=85=20=EB=82=B4=EC=9A=A9?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../azure/lambda/current_collector/load_sps.py | 12 +++++++----- .../sps_module/sps_location_manager.py | 1 - 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py index 6a2b84b1..641ad789 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/load_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/load_sps.py @@ -1,6 +1,7 @@ import re import random import requests +import traceback import concurrent.futures import time import load_price @@ -191,18 +192,20 @@ def execute_spot_placement_score_task_by_parameter_pool_df(api_calls_df, availab elif result == "NO_AVAILABLE_LOCATIONS": # NO_AVAILABLE_LOCATIONS인 경우 나머지 작업 취소 + print("No available locations found. Cancelling remaining tasks.") + print(f"SS_Resources.locations_call_history_tmp: {SS_Resources.locations_call_history_tmp}") for f in futures: if not f.done(): f.cancel() + executor.shutdown(wait=False) except JSONDecodeError as e: - print( - f"execute_spot_placement_score_task_by_parameter_pool_df func. JSON decoding error: {str(e)}") + print(f"execute_spot_placement_score_task_by_parameter_pool_df func. JSON decoding error: {str(e)}") raise except Exception as e: - print( - f"execute_spot_placement_score_task_by_parameter_pool_df func. An unexpected error occurred: {e}") + print(f"execute_spot_placement_score_task_by_parameter_pool_df func. An unexpected error occurred: {e}") + print(traceback.format_exc()) raise finally: save_tmp_files_to_s3() @@ -245,7 +248,6 @@ def execute_spot_placement_score_api(region_chunk, instance_type_chunk, availabi with SS_Resources.location_lock: res = SL_Manager.get_next_available_location() if res is None: - print("No available locations with remaining calls.") return "NO_AVAILABLE_LOCATIONS" subscription_id, location, history, over_limit_locations = res SL_Manager.update_call_history(subscription_id, location, history) diff --git a/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py b/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py index 2e21dda2..a432597a 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py +++ b/collector/spot-dataset/azure/lambda/current_collector/sps_module/sps_location_manager.py @@ -106,7 +106,6 @@ def get_next_available_location(): location = locations[location_index] if validation_can_call(location, current_history, current_over_limit_locations): - SS_Resources.last_subscription_id_and_location_tmp['last_subscription_id'] = subscription_id SS_Resources.last_subscription_id_and_location_tmp['last_location'] = location return subscription_id, location, current_history, current_over_limit_locations From 83add151db1f6cddddac85fdd8d7ec40b0e98ea1 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Wed, 26 Feb 2025 14:40:17 +0900 Subject: [PATCH 13/14] =?UTF-8?q?=EA=B2=BD=EB=A1=9C=20=EB=B3=80=EA=B2=BD?= =?UTF-8?q?=EA=B3=BC=20=EB=B6=88=ED=95=84=EC=9A=94=20=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EC=A0=80=EC=9E=A5=EC=9D=84=20=EC=82=AD=EC=A0=9C.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../current_collector/lambda_function_sps.py | 20 +++++++++---------- .../current_collector/utils/upload_data.py | 13 ++---------- const_config.py | 9 --------- 3 files changed, 12 insertions(+), 30 deletions(-) diff --git a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py index a6c2af80..7d781b91 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py +++ b/collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py @@ -85,21 +85,21 @@ def handle_res_df(sps_res_true_df, sps_res_false_df, time_datetime): def process_zone_data(price_saving_if_df, sps_res_df, time_datetime, is_true_zone): try: - price_saving_if_sps_zone_df = merge_if_saving_price_sps_df(price_saving_if_df, sps_res_df, is_true_zone) + all_data_zone_true_df = merge_if_saving_price_sps_df(price_saving_if_df, sps_res_df, is_true_zone) if is_true_zone: - availability_zone_true_all_data_prev_df = S3.read_file(f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}", 'pkl.gz') - availability_zone_true_all_data_prev_df.drop(columns=['id'], inplace=True) + prev_availability_zone_true_all_data_df = S3.read_file(f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}", 'pkl.gz') + prev_availability_zone_true_all_data_df.drop(columns=['id'], inplace=True) workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount'] feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time'] changed_df = None - if availability_zone_true_all_data_prev_df is not None and not availability_zone_true_all_data_prev_df.empty: - changed_df = compare_sps(availability_zone_true_all_data_prev_df, price_saving_if_sps_zone_df, workload_cols, feature_cols) + if prev_availability_zone_true_all_data_df is not None and not prev_availability_zone_true_all_data_df.empty: + changed_df = compare_sps(prev_availability_zone_true_all_data_df, all_data_zone_true_df, workload_cols, feature_cols) - update_success = update_latest(price_saving_if_sps_zone_df, is_true_zone) - save_success = save_raw(price_saving_if_sps_zone_df, time_datetime, is_true_zone) - cloudwatch_success = upload_cloudwatch(price_saving_if_sps_zone_df, time_datetime) + update_success = update_latest(all_data_zone_true_df, is_true_zone) + save_success = save_raw(all_data_zone_true_df, time_datetime, is_true_zone) + cloudwatch_success = upload_cloudwatch(all_data_zone_true_df, time_datetime) if changed_df is not None and not changed_df.empty: query_success = query_selector(changed_df) @@ -115,8 +115,8 @@ def process_zone_data(price_saving_if_df, sps_res_df, time_datetime, is_true_zon f"query: {query_success}, timestream: {timestream_success}" ) else: - update_success = update_latest(price_saving_if_sps_zone_df, is_true_zone) - save_success = save_raw(price_saving_if_sps_zone_df, time_datetime, is_true_zone) + update_success = update_latest(all_data_zone_true_df, is_true_zone) + save_success = save_raw(all_data_zone_true_df, time_datetime, is_true_zone) success = update_success and save_success log_details = f"update: {update_success}, save: {save_success}" diff --git a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py index 0dabaa6e..e5eb054d 100644 --- a/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py +++ b/collector/spot-dataset/azure/lambda/current_collector/utils/upload_data.py @@ -207,26 +207,17 @@ def update_latest(all_data_dataframe, availability_zones=True): dataframe_desired_count_1_df = all_data_dataframe[all_data_dataframe["DesiredCount"].isin([1, -1])].copy() dataframe_desired_count_1_df['id'] = dataframe_desired_count_1_df.index + 1 - - full_json_data = all_data_dataframe.to_dict(orient="records") desired_count_1_json_data = dataframe_desired_count_1_df.to_dict(orient="records") if availability_zones: - full_json_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_SAVE_PATH}" desired_count_1_json_path = f"{AZURE_CONST.S3_LATEST_DESIRED_COUNT_1_DATA_AVAILABILITYZONE_TRUE_SAVE_PATH}" pkl_gzip_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}" - - # Full data json - S3.upload_file(full_json_data, full_json_path, "json", set_public_read=True) # FE 노출용 json, ["DesiredCount"].isin([1, -1]) S3.upload_file(desired_count_1_json_data, desired_count_1_json_path, "json", set_public_read=True) # Full data pkl.gz, data 비교용 S3.upload_file(all_data_dataframe, pkl_gzip_path, "pkl.gz", set_public_read=True) - else: - json_path = f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_FALSE_SAVE_PATH}" - S3.upload_file(full_json_data, json_path, "json", set_public_read=True) return True except Exception as e: @@ -245,13 +236,13 @@ def save_raw(all_data_dataframe, time_utc, availability_zones=True): all_data_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/all_data/availability-zones-true/{s3_dir_name}/{s3_obj_name}.csv.gz" # 기존 경로 유지 desired_count_1_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/{s3_dir_name}/{s3_obj_name}.csv.gz" + S3.upload_file(dataframe_desired_count_1_df, desired_count_1_path, "df_to_csv.gz", set_public_read=True) else: all_data_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/all_data/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" - desired_count_1_path = f"{AZURE_CONST.S3_RAW_DATA_PATH}/availability-zones-false/{s3_dir_name}/{s3_obj_name}.csv.gz" # data 분석용 S3.upload_file(all_data_dataframe, all_data_path, "df_to_csv.gz", set_public_read=True) - S3.upload_file(dataframe_desired_count_1_df, desired_count_1_path, "df_to_csv.gz", set_public_read=True) + return True except Exception as e: diff --git a/const_config.py b/const_config.py index 9507c910..b0611fe7 100644 --- a/const_config.py +++ b/const_config.py @@ -90,7 +90,6 @@ def S3_LATEST_PRICE_SAVING_IF_DATA_SAVE_PATH(): def S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH(): return "latest_data/latest_price_saving_if.pkl.gz" - @constant def QUERY_SELECTOR_FILENAME(): return "query-selector-azure.json" @@ -155,14 +154,6 @@ def REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME(): def DF_TO_USE_TODAY_PKL_FILENAME(): return "sps-collector/azure/df_to_use_today.pkl" - @constant - def S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_SAVE_PATH(): - return "latest_data/latest_all_data_zone_true.json" - - @constant - def S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_FALSE_SAVE_PATH(): - return "latest_data/latest_all_data_zone_false.json" - @constant def S3_LATEST_DESIRED_COUNT_1_DATA_AVAILABILITYZONE_TRUE_SAVE_PATH(): return "latest_data/latest_azure.json" From ab54b267c4dd8aefc50a7dbfd1442d4872e47ae7 Mon Sep 17 00:00:00 2001 From: Taeyang-Kim Date: Wed, 26 Feb 2025 16:33:59 +0900 Subject: [PATCH 14/14] =?UTF-8?q?FE=20=EC=BD=94=EB=93=9C=20push?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/build/asset-manifest.json | 6 +- frontend/build/index.html | 2 +- .../build/static/css/main.95bea462.css.map | 2 +- frontend/build/static/js/main.0f55bfd1.js | 436 ++++++++++++++++++ ...CENSE.txt => main.0f55bfd1.js.LICENSE.txt} | 14 +- frontend/build/static/js/main.0f55bfd1.js.map | 1 + frontend/build/static/js/main.38a93151.js | 3 - frontend/build/static/js/main.38a93151.js.map | 1 - .../src/components/DataTable/ColumnData.jsx | 131 +++--- .../src/components/QuerySection/Query.jsx | 93 ++-- 10 files changed, 571 insertions(+), 118 deletions(-) create mode 100644 frontend/build/static/js/main.0f55bfd1.js rename frontend/build/static/js/{main.38a93151.js.LICENSE.txt => main.0f55bfd1.js.LICENSE.txt} (86%) create mode 100644 frontend/build/static/js/main.0f55bfd1.js.map delete mode 100644 frontend/build/static/js/main.38a93151.js delete mode 100644 frontend/build/static/js/main.38a93151.js.map diff --git a/frontend/build/asset-manifest.json b/frontend/build/asset-manifest.json index 1eff386e..f2e3a5f0 100644 --- a/frontend/build/asset-manifest.json +++ b/frontend/build/asset-manifest.json @@ -1,15 +1,15 @@ { "files": { "main.css": "/static/css/main.95bea462.css", - "main.js": "/static/js/main.38a93151.js", + "main.js": "/static/js/main.0f55bfd1.js", "static/js/488.a88b8761.chunk.js": "/static/js/488.a88b8761.chunk.js", "index.html": "/index.html", "main.95bea462.css.map": "/static/css/main.95bea462.css.map", - "main.38a93151.js.map": "/static/js/main.38a93151.js.map", + "main.0f55bfd1.js.map": "/static/js/main.0f55bfd1.js.map", "488.a88b8761.chunk.js.map": "/static/js/488.a88b8761.chunk.js.map" }, "entrypoints": [ "static/css/main.95bea462.css", - "static/js/main.38a93151.js" + "static/js/main.0f55bfd1.js" ] } \ No newline at end of file diff --git a/frontend/build/index.html b/frontend/build/index.html index f9592645..edb1f783 100644 --- a/frontend/build/index.html +++ b/frontend/build/index.html @@ -1 +1 @@ -SpotLake
\ No newline at end of file +SpotLake
\ No newline at end of file diff --git a/frontend/build/static/css/main.95bea462.css.map b/frontend/build/static/css/main.95bea462.css.map index 74346593..9f66f22d 100644 --- a/frontend/build/static/css/main.95bea462.css.map +++ b/frontend/build/static/css/main.95bea462.css.map @@ -1 +1 @@ -{"version":3,"file":"static/css/main.95bea462.css","mappings":"AAAA,KAQE,kCAAmC,CACnC,iCAAkC,CAClC,eAAgB,CALhB,mIAEY,CANZ,kBAAoB,CACpB,mBASF,CACA,oBAEE,QAAS,CADT,SAEF,CACA,KACE,uEAEF,CAEA,KACE,eACF,CACA,OACE,cACF,CACA,yBACE,mBACF,CACA,sBACE,2BACF,CACA,yBACE,yBACF,CACA,YACE,iBACF,CACA,YACE,WAAY,CACZ,mBACF,CC1CA,KACE,iBACF,CAEA,UACE,aAAc,CACd,mBACF,CAEA,8CACE,UACE,2CACF,CACF,CAEA,YAKE,kBAAmB,CAJnB,wBAAyB,CAOzB,UAAY,CALZ,YAAa,CACb,qBAAsB,CAGtB,4BAA6B,CAD7B,sBAAuB,CAJvB,gBAOF,CAEA,UACE,aACF,CAEA,yBACE,GACE,sBACF,CACA,GACE,uBACF,CACF","sources":["index.css","App.css"],"sourcesContent":["body {\n margin: 0 !important;\n padding: 0 !important;\n /*overflow: unset !important;*/\n /*overflow-x: hidden !important;*/\n font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',\n 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',\n sans-serif;\n -webkit-font-smoothing: antialiased;\n -moz-osx-font-smoothing: grayscale;\n background: #fff;\n}\nh1, h2, h3, h4, h5, h6, p {\n padding: 0;\n margin: 0;\n}\ncode {\n font-family: source-code-pro, Menlo, Monaco, Consolas, 'Courier New',\n monospace;\n}\n\n.App {\n margin-top: 87px;\n}\nbutton {\n cursor: pointer;\n}\n.recharts-legend-wrapper {\n left: 80px !important;\n}\n.recharts-legend-item {\n display: list-item !important;\n}\n.recharts-default-legend {\n text-align: left !important;\n}\n.firstChart {\n padding-top: 100px;\n}\n.date-input {\n height: 100%;\n justify-content: end;\n}",".App {\n text-align: center;\n}\n\n.App-logo {\n height: 40vmin;\n pointer-events: none;\n}\n\n@media (prefers-reduced-motion: no-preference) {\n .App-logo {\n animation: App-logo-spin infinite 20s linear;\n }\n}\n\n.App-header {\n background-color: #282c34;\n min-height: 100vh;\n display: flex;\n flex-direction: column;\n align-items: center;\n justify-content: center;\n font-size: calc(10px + 2vmin);\n color: white;\n}\n\n.App-link {\n color: #61dafb;\n}\n\n@keyframes App-logo-spin {\n from {\n transform: rotate(0deg);\n }\n to {\n transform: rotate(360deg);\n }\n}\n"],"names":[],"sourceRoot":""} \ No newline at end of file +{"version":3,"file":"static/css/main.95bea462.css","mappings":"AAAA,KAQE,kCAAmC,CACnC,iCAAkC,CAClC,eAAgB,CALhB,mIAEY,CANZ,kBAAoB,CACpB,mBASF,CACA,oBAEE,QAAS,CADT,SAEF,CACA,KACE,uEAEF,CAEA,KACE,eACF,CACA,OACE,cACF,CACA,yBACE,mBACF,CACA,sBACE,2BACF,CACA,yBACE,yBACF,CACA,YACE,iBACF,CACA,YACE,WAAY,CACZ,mBACF,CC1CA,KACE,iBACF,CAEA,UACE,aAAc,CACd,mBACF,CAEA,8CACE,UACE,2CACF,CACF,CAEA,YAKE,kBAAmB,CAJnB,wBAAyB,CAOzB,UAAY,CALZ,YAAa,CACb,qBAAsB,CAGtB,4BAA6B,CAD7B,sBAAuB,CAJvB,gBAOF,CAEA,UACE,aACF,CAEA,yBACE,GACE,sBACF,CACA,GACE,uBACF,CACF","sources":["index.css","App.css"],"sourcesContent":["body {\r\n margin: 0 !important;\r\n padding: 0 !important;\r\n /*overflow: unset !important;*/\r\n /*overflow-x: hidden !important;*/\r\n font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',\r\n 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',\r\n sans-serif;\r\n -webkit-font-smoothing: antialiased;\r\n -moz-osx-font-smoothing: grayscale;\r\n background: #fff;\r\n}\r\nh1, h2, h3, h4, h5, h6, p {\r\n padding: 0;\r\n margin: 0;\r\n}\r\ncode {\r\n font-family: source-code-pro, Menlo, Monaco, Consolas, 'Courier New',\r\n monospace;\r\n}\r\n\r\n.App {\r\n margin-top: 87px;\r\n}\r\nbutton {\r\n cursor: pointer;\r\n}\r\n.recharts-legend-wrapper {\r\n left: 80px !important;\r\n}\r\n.recharts-legend-item {\r\n display: list-item !important;\r\n}\r\n.recharts-default-legend {\r\n text-align: left !important;\r\n}\r\n.firstChart {\r\n padding-top: 100px;\r\n}\r\n.date-input {\r\n height: 100%;\r\n justify-content: end;\r\n}",".App {\r\n text-align: center;\r\n}\r\n\r\n.App-logo {\r\n height: 40vmin;\r\n pointer-events: none;\r\n}\r\n\r\n@media (prefers-reduced-motion: no-preference) {\r\n .App-logo {\r\n animation: App-logo-spin infinite 20s linear;\r\n }\r\n}\r\n\r\n.App-header {\r\n background-color: #282c34;\r\n min-height: 100vh;\r\n display: flex;\r\n flex-direction: column;\r\n align-items: center;\r\n justify-content: center;\r\n font-size: calc(10px + 2vmin);\r\n color: white;\r\n}\r\n\r\n.App-link {\r\n color: #61dafb;\r\n}\r\n\r\n@keyframes App-logo-spin {\r\n from {\r\n transform: rotate(0deg);\r\n }\r\n to {\r\n transform: rotate(360deg);\r\n }\r\n}\r\n"],"names":[],"sourceRoot":""} \ No newline at end of file diff --git a/frontend/build/static/js/main.0f55bfd1.js b/frontend/build/static/js/main.0f55bfd1.js new file mode 100644 index 00000000..a2798bfc --- /dev/null +++ b/frontend/build/static/js/main.0f55bfd1.js @@ -0,0 +1,436 @@ +/*! For license information please see main.0f55bfd1.js.LICENSE.txt */ +(()=>{var e={191:(e,t)=>{"use strict";var n=Symbol.for("react.transitional.element"),r=Symbol.for("react.portal"),o=Symbol.for("react.fragment"),l=Symbol.for("react.strict_mode"),a=Symbol.for("react.profiler");Symbol.for("react.provider");var i=Symbol.for("react.consumer"),s=Symbol.for("react.context"),u=Symbol.for("react.forward_ref"),c=Symbol.for("react.suspense"),d=Symbol.for("react.suspense_list"),p=Symbol.for("react.memo"),f=Symbol.for("react.lazy"),h=Symbol.for("react.offscreen"),m=Symbol.for("react.client.reference");function g(e){if("object"===typeof e&&null!==e){var t=e.$$typeof;switch(t){case n:switch(e=e.type){case o:case a:case l:case c:case d:return e;default:switch(e=e&&e.$$typeof){case s:case u:case f:case p:case i:return e;default:return t}}case r:return t}}}t.vM=u,t.lD=p},219:(e,t,n)=>{"use strict";var r=n(3763),o={childContextTypes:!0,contextType:!0,contextTypes:!0,defaultProps:!0,displayName:!0,getDefaultProps:!0,getDerivedStateFromError:!0,getDerivedStateFromProps:!0,mixins:!0,propTypes:!0,type:!0},l={name:!0,length:!0,prototype:!0,caller:!0,callee:!0,arguments:!0,arity:!0},a={$$typeof:!0,compare:!0,defaultProps:!0,displayName:!0,propTypes:!0,type:!0},i={};function s(e){return r.isMemo(e)?a:i[e.$$typeof]||o}i[r.ForwardRef]={$$typeof:!0,render:!0,defaultProps:!0,displayName:!0,propTypes:!0},i[r.Memo]=a;var u=Object.defineProperty,c=Object.getOwnPropertyNames,d=Object.getOwnPropertySymbols,p=Object.getOwnPropertyDescriptor,f=Object.getPrototypeOf,h=Object.prototype;e.exports=function e(t,n,r){if("string"!==typeof n){if(h){var o=f(n);o&&o!==h&&e(t,o,r)}var a=c(n);d&&(a=a.concat(d(n)));for(var i=s(t),m=s(n),g=0;g{"use strict";n.r(t),n.d(t,{default:()=>r.A});var r=n(7868)},579:(e,t,n)=>{"use strict";e.exports=n(1153)},869:(e,t,n)=>{"use strict";n.d(t,{A:()=>l});n(5043);var r=n(3290),o=n(579);function l(e){const{styles:t,defaultTheme:n={}}=e,l="function"===typeof t?e=>{return t(void 0===(r=e)||null===r||0===Object.keys(r).length?n:e);var r}:t;return(0,o.jsx)(r.mL,{styles:l})}},918:(e,t,n)=>{"use strict";function r(e){var t=Object.create(null);return function(n){return void 0===t[n]&&(t[n]=e(n)),t[n]}}n.d(t,{A:()=>r})},1068:(e,t,n)=>{"use strict";n.d(t,{A:()=>l});var r=n(918),o=/^((children|dangerouslySetInnerHTML|key|ref|autoFocus|defaultValue|defaultChecked|innerHTML|suppressContentEditableWarning|suppressHydrationWarning|valueLink|abbr|accept|acceptCharset|accessKey|action|allow|allowUserMedia|allowPaymentRequest|allowFullScreen|allowTransparency|alt|async|autoComplete|autoPlay|capture|cellPadding|cellSpacing|challenge|charSet|checked|cite|classID|className|cols|colSpan|content|contentEditable|contextMenu|controls|controlsList|coords|crossOrigin|data|dateTime|decoding|default|defer|dir|disabled|disablePictureInPicture|disableRemotePlayback|download|draggable|encType|enterKeyHint|fetchpriority|fetchPriority|form|formAction|formEncType|formMethod|formNoValidate|formTarget|frameBorder|headers|height|hidden|high|href|hrefLang|htmlFor|httpEquiv|id|inputMode|integrity|is|keyParams|keyType|kind|label|lang|list|loading|loop|low|marginHeight|marginWidth|max|maxLength|media|mediaGroup|method|min|minLength|multiple|muted|name|nonce|noValidate|open|optimum|pattern|placeholder|playsInline|poster|preload|profile|radioGroup|readOnly|referrerPolicy|rel|required|reversed|role|rows|rowSpan|sandbox|scope|scoped|scrolling|seamless|selected|shape|size|sizes|slot|span|spellCheck|src|srcDoc|srcLang|srcSet|start|step|style|summary|tabIndex|target|title|translate|type|useMap|value|width|wmode|wrap|about|datatype|inlist|prefix|property|resource|typeof|vocab|autoCapitalize|autoCorrect|autoSave|color|incremental|fallback|inert|itemProp|itemScope|itemType|itemID|itemRef|on|option|results|security|unselectable|accentHeight|accumulate|additive|alignmentBaseline|allowReorder|alphabetic|amplitude|arabicForm|ascent|attributeName|attributeType|autoReverse|azimuth|baseFrequency|baselineShift|baseProfile|bbox|begin|bias|by|calcMode|capHeight|clip|clipPathUnits|clipPath|clipRule|colorInterpolation|colorInterpolationFilters|colorProfile|colorRendering|contentScriptType|contentStyleType|cursor|cx|cy|d|decelerate|descent|diffuseConstant|direction|display|divisor|dominantBaseline|dur|dx|dy|edgeMode|elevation|enableBackground|end|exponent|externalResourcesRequired|fill|fillOpacity|fillRule|filter|filterRes|filterUnits|floodColor|floodOpacity|focusable|fontFamily|fontSize|fontSizeAdjust|fontStretch|fontStyle|fontVariant|fontWeight|format|from|fr|fx|fy|g1|g2|glyphName|glyphOrientationHorizontal|glyphOrientationVertical|glyphRef|gradientTransform|gradientUnits|hanging|horizAdvX|horizOriginX|ideographic|imageRendering|in|in2|intercept|k|k1|k2|k3|k4|kernelMatrix|kernelUnitLength|kerning|keyPoints|keySplines|keyTimes|lengthAdjust|letterSpacing|lightingColor|limitingConeAngle|local|markerEnd|markerMid|markerStart|markerHeight|markerUnits|markerWidth|mask|maskContentUnits|maskUnits|mathematical|mode|numOctaves|offset|opacity|operator|order|orient|orientation|origin|overflow|overlinePosition|overlineThickness|panose1|paintOrder|pathLength|patternContentUnits|patternTransform|patternUnits|pointerEvents|points|pointsAtX|pointsAtY|pointsAtZ|preserveAlpha|preserveAspectRatio|primitiveUnits|r|radius|refX|refY|renderingIntent|repeatCount|repeatDur|requiredExtensions|requiredFeatures|restart|result|rotate|rx|ry|scale|seed|shapeRendering|slope|spacing|specularConstant|specularExponent|speed|spreadMethod|startOffset|stdDeviation|stemh|stemv|stitchTiles|stopColor|stopOpacity|strikethroughPosition|strikethroughThickness|string|stroke|strokeDasharray|strokeDashoffset|strokeLinecap|strokeLinejoin|strokeMiterlimit|strokeOpacity|strokeWidth|surfaceScale|systemLanguage|tableValues|targetX|targetY|textAnchor|textDecoration|textRendering|textLength|to|transform|u1|u2|underlinePosition|underlineThickness|unicode|unicodeBidi|unicodeRange|unitsPerEm|vAlphabetic|vHanging|vIdeographic|vMathematical|values|vectorEffect|version|vertAdvY|vertOriginX|vertOriginY|viewBox|viewTarget|visibility|widths|wordSpacing|writingMode|x|xHeight|x1|x2|xChannelSelector|xlinkActuate|xlinkArcrole|xlinkHref|xlinkRole|xlinkShow|xlinkTitle|xlinkType|xmlBase|xmlns|xmlnsXlink|xmlLang|xmlSpace|y|y1|y2|yChannelSelector|z|zoomAndPan|for|class|autofocus)|(([Dd][Aa][Tt][Aa]|[Aa][Rr][Ii][Aa]|x)-.*))$/,l=(0,r.A)((function(e){return o.test(e)||111===e.charCodeAt(0)&&110===e.charCodeAt(1)&&e.charCodeAt(2)<91}))},1153:(e,t,n)=>{"use strict";n(2123);var r=n(5043),o=60103;if(t.Fragment=60107,"function"===typeof Symbol&&Symbol.for){var l=Symbol.for;o=l("react.element"),t.Fragment=l("react.fragment")}var a=r.__SECRET_INTERNALS_DO_NOT_USE_OR_YOU_WILL_BE_FIRED.ReactCurrentOwner,i=Object.prototype.hasOwnProperty,s={key:!0,ref:!0,__self:!0,__source:!0};function u(e,t,n){var r,l={},u=null,c=null;for(r in void 0!==n&&(u=""+n),void 0!==t.key&&(u=""+t.key),void 0!==t.ref&&(c=t.ref),t)i.call(t,r)&&!s.hasOwnProperty(r)&&(l[r]=t[r]);if(e&&e.defaultProps)for(r in t=e.defaultProps)void 0===l[r]&&(l[r]=t[r]);return{$$typeof:o,type:e,key:u,ref:c,props:l,_owner:a.current}}t.jsx=u,t.jsxs=u},1188:(e,t,n)=>{"use strict";n.d(t,{A:()=>r});const r=function(e){let t=arguments.length>1&&void 0!==arguments[1]?arguments[1]:Number.MIN_SAFE_INTEGER,n=arguments.length>2&&void 0!==arguments[2]?arguments[2]:Number.MAX_SAFE_INTEGER;return Math.max(t,Math.min(e,n))}},1497:(e,t,n)=>{"use strict";var r=n(3218);function o(){}function l(){}l.resetWarningCache=o,e.exports=function(){function e(e,t,n,o,l,a){if(a!==r){var i=new Error("Calling PropTypes validators directly is not supported by the `prop-types` package. Use PropTypes.checkPropTypes() to call them. Read more at http://fb.me/use-check-prop-types");throw i.name="Invariant Violation",i}}function t(){return e}e.isRequired=e;var n={array:e,bigint:e,bool:e,func:e,number:e,object:e,string:e,symbol:e,any:e,arrayOf:t,element:e,elementType:e,instanceOf:t,node:e,objectOf:t,oneOf:t,oneOfType:t,shape:t,exact:t,checkPropTypes:l,resetWarningCache:o};return n.PropTypes=n,n}},1722:(e,t,n)=>{"use strict";n.d(t,{Rk:()=>r,SF:()=>o,sk:()=>l});function r(e,t,n){var r="";return n.split(" ").forEach((function(n){void 0!==e[n]?t.push(e[n]+";"):n&&(r+=n+" ")})),r}var o=function(e,t,n){var r=e.key+"-"+t.name;!1===n&&void 0===e.registered[r]&&(e.registered[r]=t.styles)},l=function(e,t,n){o(e,t,n);var r=e.key+"-"+t.name;if(void 0===e.inserted[t.name]){var l=t;do{e.insert(t===l?"."+r:"",l,e.sheet,!0),l=l.next}while(void 0!==l)}}},2086:(e,t,n)=>{"use strict";e.exports=n(5082)},2123:e=>{"use strict";var t=Object.getOwnPropertySymbols,n=Object.prototype.hasOwnProperty,r=Object.prototype.propertyIsEnumerable;e.exports=function(){try{if(!Object.assign)return!1;var e=new String("abc");if(e[5]="de","5"===Object.getOwnPropertyNames(e)[0])return!1;for(var t={},n=0;n<10;n++)t["_"+String.fromCharCode(n)]=n;if("0123456789"!==Object.getOwnPropertyNames(t).map((function(e){return t[e]})).join(""))return!1;var r={};return"abcdefghijklmnopqrst".split("").forEach((function(e){r[e]=e})),"abcdefghijklmnopqrst"===Object.keys(Object.assign({},r)).join("")}catch(o){return!1}}()?Object.assign:function(e,o){for(var l,a,i=function(e){if(null===e||void 0===e)throw new TypeError("Object.assign cannot be called with null or undefined");return Object(e)}(e),s=1;s{"use strict";t.A=void 0;var r=function(e,t){if(!t&&e&&e.__esModule)return e;if(null===e||"object"!=typeof e&&"function"!=typeof e)return{default:e};var n=l(t);if(n&&n.has(e))return n.get(e);var r={__proto__:null},o=Object.defineProperty&&Object.getOwnPropertyDescriptor;for(var a in e)if("default"!==a&&Object.prototype.hasOwnProperty.call(e,a)){var i=o?Object.getOwnPropertyDescriptor(e,a):null;i&&(i.get||i.set)?Object.defineProperty(r,a,i):r[a]=e[a]}return r.default=e,n&&n.set(e,r),r}(n(5043)),o=n(7688);function l(e){if("function"!=typeof WeakMap)return null;var t=new WeakMap,n=new WeakMap;return(l=function(e){return e?n:t})(e)}t.A=function(){let e=arguments.length>0&&void 0!==arguments[0]?arguments[0]:null;const t=r.useContext(o.ThemeContext);return t&&(n=t,0!==Object.keys(n).length)?t:e;var n}},2730:(e,t,n)=>{"use strict";var r=n(5043),o=n(2123),l=n(8853);function a(e){for(var t="https://reactjs.org/docs/error-decoder.html?invariant="+e,n=1;n