Skip to content

Commit

Permalink
Merge pull request #511 from ddps-lab/azure-sps-merge-if-price
Browse files Browse the repository at this point in the history
price eviction sps 병합 부분 개발
  • Loading branch information
krtaiyang authored Feb 9, 2025
2 parents 00909e1 + 3dc053b commit 1a5d6bb
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from datetime import datetime, timezone
from load_if import load_if
from load_price import collect_price_with_multithreading
from utils.merge_df import merge_df
from utils.merge_df import merge_price_eviction_df
from utils.compare_data import compare
from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch
from utils.pub_service import send_slack_message, AZURE_CONST, STORAGE_CONST

BUCKET_NAME = os.environ.get('BUCKET_NAME')
KEY = os.environ.get('S3_LATEST_DATA_SAVE_PATH')
WORKLOAD_COLS = ['InstanceTier', 'InstanceType', 'Region']
FEATURE_COLS = ['OndemandPrice', 'SpotPrice', 'IF']
str_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M")
Expand Down Expand Up @@ -40,7 +38,7 @@ def azure_collector(timestamp):
is_if_fetch_success = False

if is_price_fetch_success and is_if_fetch_success:
join_df = merge_df(current_df, eviction_df)
join_df = merge_price_eviction_df(current_df, eviction_df)
elif not is_price_fetch_success and is_if_fetch_success:
join_df = eviction_df
elif is_price_fetch_success and not is_if_fetch_success:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import load_sps
import pandas as pd
from datetime import datetime
from sps_module import sps_shared_resources
from utils.merge_df import merge_price_eviction_sps_df
from utils.upload_data import update_latest_sps, save_raw_sps
from utils.pub_service import send_slack_message, logger
from utils.pub_service import send_slack_message, logger, S3, AZURE_CONST

FIRST_TIME_ACTION = "First_Time" # 첫 실행 액션
EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션
Expand Down Expand Up @@ -33,11 +35,17 @@ def lambda_handler(event, _):
else:
raise ValueError(f"Invalid lambda action.")

# SPS 데이터 처리
if sps_res_df is None:
raise ValueError("sps_res_df is None")
if not handle_res_df(sps_res_df, event_time_utc):
raise RuntimeError("Failed to update or save SPS data")

# price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz')
price_if_df = pd.DataFrame(S3.read_file(AZURE_CONST.LATEST_FILENAME, 'json'))
price_eviction_sps_df = merge_price_eviction_sps_df(price_if_df, sps_res_df)

if sps_res_df is None: raise ValueError("sps_res_df is None")
if price_if_df is None: raise ValueError("price_if_df is None")
if price_eviction_sps_df is None: raise ValueError("price_eviction_sps_df is None")

if not update_and_save_res_df(price_eviction_sps_df, event_time_utc):
raise RuntimeError("Failed to update or save price_eviction_sps_df data")

return handle_response(200, "Executed Successfully!", action, event_time_utc)

Expand All @@ -48,10 +56,10 @@ def lambda_handler(event, _):
return handle_response(500, "Execute Failed!", action, event_time_utc, str(e))


def handle_res_df(sps_res_df, event_time_utc):
def update_and_save_res_df(price_eviction_sps_df, event_time_utc):
try:
update_result = update_latest_sps(sps_res_df, event_time_utc)
save_result = save_raw_sps(sps_res_df, event_time_utc)
update_result = update_latest_sps(price_eviction_sps_df, event_time_utc)
save_result = save_raw_sps(price_eviction_sps_df, event_time_utc)
return update_result and save_result

except Exception as e:
Expand Down
46 changes: 23 additions & 23 deletions collector/spot-dataset/azure/lambda/current_collector/load_sps.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def collect_spot_placement_score_first_time(desired_count):
start_time = time.time()
res_price_api = collect_regions_and_instance_types_df_by_priceapi()
regions_and_instance_types_df, SS_Resources.region_map_and_instance_map_tmp['region_map'], SS_Resources.region_map_and_instance_map_tmp['instance_map'] = res_price_api
S3.upload_file(SS_Resources.region_map_and_instance_map_tmp,
AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME, "json")

S3.upload_file(SS_Resources.region_map_and_instance_map_tmp, f"{AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME}", "json")

end_time = time.time()
elapsed = end_time - start_time
Expand Down Expand Up @@ -101,7 +101,7 @@ def collect_spot_placement_score_first_time(desired_count):
df_greedy_clustering_filtered = sps_prepare_parameters.greedy_clustering_to_create_optimized_request_list(
regions_and_instance_types_filtered_df)

S3.upload_file(df_greedy_clustering_filtered, AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME, "pkl")
S3.upload_file(df_greedy_clustering_filtered, f"{AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME}", "pkl")

end_time = time.time()
elapsed = end_time - start_time
Expand All @@ -122,7 +122,7 @@ def collect_spot_placement_score(desired_count):
assert get_variable_from_s3()
initialize_sps_shared_resources()

df_greedy_clustering_filtered = S3.read_file(AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME, 'pkl')
df_greedy_clustering_filtered = S3.read_file(f"{AZURE_CONST.DF_TO_USE_TODAY_PKL_FILENAME}", 'pkl')

sps_res_df = execute_spot_placement_score_task_by_parameter_pool_df(df_greedy_clustering_filtered, True, desired_count)
print(f'Time_out_retry_count: {SS_Resources.time_out_retry_count}')
Expand Down Expand Up @@ -162,10 +162,10 @@ def execute_spot_placement_score_task_by_parameter_pool_df(api_calls_df, availab
score_data = {
"DesiredCount": desired_count,
"AvailabilityZone": score.get("availabilityZone", None),
"RegionCodeSPS": score.get("region", None),
# "RegionCodeSPS": score.get("region", None),
"Region": SS_Resources.region_map_and_instance_map_tmp['region_map'].get(
score.get("region", ""), ""),
"InstanceTypeSPS": score.get("sku", None),
# "InstanceTypeSPS": score.get("sku", None),
"InstanceTier": SS_Resources.region_map_and_instance_map_tmp['instance_map'].get(
score.get("sku", ""), {}).get("InstanceTier", None),
"InstanceType": SS_Resources.region_map_and_instance_map_tmp['instance_map'].get(
Expand Down Expand Up @@ -323,12 +323,12 @@ def extract_invalid_values(error_message):
def initialize_files_in_s3():
try:
files_to_initialize = {
AZURE_CONST.INVALID_REGIONS_JSON_FILENAME: [],
AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME: []
f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}": [],
f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}": []
}

for file_name, data in files_to_initialize.items():
S3.upload_file(data, file_name, "json")
for file_path, data in files_to_initialize.items():
S3.upload_file(data, file_path, "json")

print("Successfully initialized files in S3.")
return True
Expand Down Expand Up @@ -401,28 +401,28 @@ def initialize_sps_shared_resources():

def save_tmp_files_to_s3():
files_to_upload = {
AZURE_CONST.INVALID_REGIONS_JSON_FILENAME: SS_Resources.invalid_regions_tmp,
AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME: SS_Resources.invalid_instance_types_tmp,
AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME: SS_Resources.locations_call_history_tmp,
AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME: SS_Resources.locations_over_limit_tmp,
AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME: {
f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}": SS_Resources.invalid_regions_tmp,
f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}": SS_Resources.invalid_instance_types_tmp,
f"{AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME}": SS_Resources.locations_call_history_tmp,
f"{AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME}": SS_Resources.locations_over_limit_tmp,
f"{AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME}": {
"last_subscription_id": SS_Resources.last_subscription_id_and_location_tmp['last_subscription_id'],
"last_location": SS_Resources.last_subscription_id_and_location_tmp['last_location']
},
}

for file_name, file_data in files_to_upload.items():
for file_path, file_data in files_to_upload.items():
if file_data:
S3.upload_file(file_data, file_name, "json")
S3.upload_file(file_data, file_path, "json")

def get_variable_from_s3():
try:
invalid_regions_data = S3.read_file(AZURE_CONST.INVALID_REGIONS_JSON_FILENAME, 'json')
instance_types_data = S3.read_file(AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME, 'json')
call_history_data = S3.read_file(AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME, 'json')
over_limit_data = S3.read_file(AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME, 'json')
last_location_index_data = S3.read_file(AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME, 'json')
region_map_and_instance_map = S3.read_file(AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME, 'json')
invalid_regions_data = S3.read_file(f"{AZURE_CONST.INVALID_REGIONS_JSON_FILENAME}", 'json')
instance_types_data = S3.read_file(f"{AZURE_CONST.INVALID_INSTANCE_TYPES_JSON_FILENAME}", 'json')
call_history_data = S3.read_file(f"{AZURE_CONST.LOCATIONS_CALL_HISTORY_JSON_FILENAME}", 'json')
over_limit_data = S3.read_file(f"{AZURE_CONST.LOCATIONS_OVER_LIMIT_JSON_FILENAME}", 'json')
last_location_index_data = S3.read_file(f"{AZURE_CONST.LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME}", 'json')
region_map_and_instance_map = S3.read_file(f"{AZURE_CONST.REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME}", 'json')

SS_Resources.invalid_regions_tmp = invalid_regions_data
SS_Resources.invalid_instance_types_tmp = instance_types_data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import numpy as np

def merge_df(price_df, eviction_df):
def merge_price_eviction_df(price_df, eviction_df):
join_df = pd.merge(price_df, eviction_df,
left_on=['InstanceType', 'InstanceTier', 'armRegionName'],
right_on=['InstanceType', 'InstanceTier', 'Region'],
Expand All @@ -12,3 +12,13 @@ def merge_df(price_df, eviction_df):
join_df.rename(columns={'Region_x' : 'Region', 'OndemandPrice_x' : 'OndemandPrice', 'SpotPrice_x' : 'SpotPrice', 'Savings_x' : 'Savings'}, inplace=True)

return join_df


def merge_price_eviction_sps_df(price_eviction_df, sps_df):
join_df = pd.merge(price_eviction_df, sps_df, on=['InstanceTier', 'InstanceType', 'Region'], how='outer')
join_df.rename(columns={'time_x': 'PriceEviction_Update_Time', 'time_y': 'SPS_Update_Time'}, inplace=True)
join_df.drop(columns=['id_x', 'id_y'], inplace=True)
join_df = join_df[["InstanceTier", "InstanceType", "Region", "OndemandPrice", "SpotPrice", "Savings", "IF",
"PriceEviction_Update_Time", "DesiredCount", "AvailabilityZone", "Score", "SPS_Update_Time"]]

return join_df
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import os
import logging
import pandas as pd
from const_config import AzureCollector, Storage

AZURE_CONST = AzureCollector()
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self):
self.client = s3_client
self.resource = s3_resource

def upload_file(self, data, file_name, file_type="json", set_public_read = False):
def upload_file(self, data, file_path, file_type="json", set_public_read = False):
try:
if file_type not in ["json", "pkl", "df_to_csv.gz"]:
raise ValueError("Unsupported file type. Use 'json' or 'pkl'.")
Expand All @@ -68,40 +69,41 @@ def upload_file(self, data, file_name, file_type="json", set_public_read = False
data.to_csv(file, index=False, compression="gzip")
file.seek(0)

file_path = f"{AZURE_CONST.SPS_FILE_PATH}/{file_name}"
self.client.upload_fileobj(file, STORAGE_CONST.BUCKET_NAME, file_path)

if set_public_read:
object_acl = self.resource.ObjectAcl(STORAGE_CONST.BUCKET_NAME, file_path)
object_acl.put(ACL='public-read')

print(f"[S3]: Succeed to upload. Filename: [{file_name}]")
print(f"[S3]: Succeed to upload. Filename: [{file_path}]")

except ValueError as ve:
print(f"Validation error for {file_name}: {ve}")
print(f"Validation error for {file_path}: {ve}")
except Exception as e:
print(f"Upload failed for {file_name}: {e}")
print(f"Upload failed for {file_path}: {e}")

def read_file(self, file_name, file_type="json"):
def read_file(self, file_path, file_type="json"):
try:
file_path = f"{AZURE_CONST.SPS_FILE_PATH}/{file_name}"
response = self.client.get_object(Bucket=STORAGE_CONST.BUCKET_NAME, Key=file_path)
file = io.BytesIO(response['Body'].read())

if file_type == "json":
return json.load(file)

elif file_type == "pkl":
return pickle.load(file)
return pd.read_pickle(file)

elif file_type == "pkl.gz":
return pd.read_pickle(file, compression="gzip")

else:
raise ValueError("Unsupported file type. Use 'json' or 'pkl'.")
raise ValueError("Unsupported file type. Use 'json' or 'pkl' or 'pkl.gz'.")

except json.JSONDecodeError:
print(f"Warning: {file_name} is not a valid JSON file.")
print(f"Warning: {file_path} is not a valid JSON file.")
return None
except Exception as e:
print(f"Error reading {file_name} from S3: {e}")
print(f"Error reading {file_path} from S3: {e}")
return None

class LoggerConfig(logging.Logger):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,25 @@ def update_latest(data, timestamp):

data['time'] = datetime.strftime(timestamp, '%Y-%m-%d %H:%M:%S')

result = data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME }", orient='records')
data.to_json(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", orient='records')
data.to_pickle(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", compression="gzip")

session = boto3.Session()
s3 = session.client('s3')

with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME }", 'rb') as f:
with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_FILENAME}", 'rb') as f:
s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH)

with open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.LATEST_PRICE_IF_PKL_GZIP_FILENAME}", 'rb') as f:
s3.upload_fileobj(f, STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH)

s3 = boto3.resource('s3')
object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH)
response = object_acl.put(ACL='public-read')

object_acl = s3.ObjectAcl(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH)
response = object_acl.put(ACL='public-read')

pickle.dump(data, open(f"{AZURE_CONST.SERVER_SAVE_DIR}/{AZURE_CONST.SERVER_SAVE_FILENAME}", "wb"))


Expand Down Expand Up @@ -164,7 +171,8 @@ def update_latest_sps(dataframe, time_utc):
dataframe['AvailabilityZone'] = dataframe['AvailabilityZone'].where(pd.notna(dataframe['AvailabilityZone']), None)

json_data = dataframe.to_dict(orient="records")
S3.upload_file(json_data, f"result/{AZURE_CONST.LATEST_SPS_FILENAME}", "json", set_public_read=True)

S3.upload_file(json_data, f"{AZURE_CONST.LATEST_SPS_FILENAME}", "json", set_public_read=True)
return True

except Exception as e:
Expand All @@ -179,8 +187,7 @@ def save_raw_sps(dataframe, time_utc):

s3_dir_name = time_utc.strftime("%Y/%m/%d")
s3_obj_name = time_utc.strftime("%H-%M-%S")

S3.upload_file(dataframe, f"result/rawdata/{s3_dir_name}/{s3_obj_name}.csv.gz", "df_to_csv.gz", set_public_read=True)
S3.upload_file(dataframe, f"sps-collector/azure/result/rawdata/{s3_dir_name}/{s3_obj_name}.csv.gz", "df_to_csv.gz", set_public_read=True)
return True

except Exception as e:
Expand Down
30 changes: 17 additions & 13 deletions const_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ def LATEST_FILENAME():
return "latest_azure.json"

@constant
def LATEST_SPS_FILENAME():
return "latest_azure_sps.json"
def LATEST_PRICE_IF_PKL_GZIP_FILENAME():
return "latest_price_if_azure.pkl.gz"

@constant
def S3_LATEST_PRICE_IF_GZIP_SAVE_PATH():
return "latest_data/latest_price_if_azure.pkl.gz"

@constant
def S3_LATEST_DATA_SAVE_PATH():
Expand Down Expand Up @@ -130,37 +134,37 @@ def SPOT_DATA_COLLECTION_LOG_GROUP_NAME():
def LOG_STREAM_NAME():
return "Azure-Count"

@constant
def SPS_FILE_PATH():
return "sps-collector/azure"

@constant
def LOCATIONS_CALL_HISTORY_JSON_FILENAME():
return "locations_call_history.json"
return "sps-collector/azure/locations_call_history.json"

@constant
def LOCATIONS_OVER_LIMIT_JSON_FILENAME():
return "locations_over_limit.json"
return "sps-collector/azure/locations_over_limit.json"

@constant
def INVALID_REGIONS_JSON_FILENAME():
return "invalid_regions.json"
return "sps-collector/azure/invalid_regions.json"

@constant
def INVALID_INSTANCE_TYPES_JSON_FILENAME():
return "invalid_instance_types.json"
return "sps-collector/azure/invalid_instance_types.json"

@constant
def LAST_SUBSCRIPTION_ID_AND_LOCATION_JSON_FILENAME():
return "last_subscription_id_and_location.json"
return "sps-collector/azure/last_subscription_id_and_location.json"

@constant
def REGION_MAP_AND_INSTANCE_MAP_JSON_FILENAME():
return "region_map_and_instance_map.json"
return "sps-collector/azure/region_map_and_instance_map.json"

@constant
def DF_TO_USE_TODAY_PKL_FILENAME():
return "df_to_use_today.pkl"
return "sps-collector/azure/df_to_use_today.pkl"

@constant
def LATEST_SPS_FILENAME():
return "sps-collector/azure/result/latest_azure_sps.json"

class GcpCollector(object):
@constant
Expand Down

0 comments on commit 1a5d6bb

Please sign in to comment.