Skip to content

Commit

Permalink
Merge pull request #531 from ddps-lab/azure-collector-fix
Browse files Browse the repository at this point in the history
Azure SPS 정보 제공을 위한 실제 운영 환경 통합
  • Loading branch information
krtaiyang authored Feb 26, 2025
2 parents 620c0e2 + ab54b26 commit 5e313b7
Show file tree
Hide file tree
Showing 21 changed files with 949 additions and 411 deletions.
15 changes: 10 additions & 5 deletions .github/workflows/azure-sps-lambda-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ jobs:
rm -f azure_sps_lambda.zip
rm -f ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip
zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip \
./collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py \
./collector/spot-dataset/azure/lambda/current_collector/load_price.py \
./collector/spot-dataset/azure/lambda/current_collector/load_sps.py \
./const_config.py
mkdir -p /tmp/lambda_collector
cp ./collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py /tmp/lambda_collector/lambda_function.py
cp ./collector/spot-dataset/azure/lambda/current_collector/load_price.py /tmp/lambda_collector/
cp ./collector/spot-dataset/azure/lambda/current_collector/load_sps.py /tmp/lambda_collector/
cp ./const_config.py /tmp/lambda_collector/
zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip /tmp/lambda_collector/*

rm -rf /tmp/lambda_collector

cd ./collector/spot-dataset/azure/lambda/current_collector/
zip -r azure_sps_lambda.zip ./utils/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import os
import json
import boto3
import pandas as pd
from datetime import datetime, timezone
from datetime import datetime
from load_if import load_if
from load_price import collect_price_with_multithreading
from utils.merge_df import merge_price_eviction_df
from utils.compare_data import compare
from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch
from utils.pub_service import send_slack_message, AZURE_CONST, STORAGE_CONST

WORKLOAD_COLS = ['InstanceTier', 'InstanceType', 'Region']
FEATURE_COLS = ['OndemandPrice', 'SpotPrice', 'IF']

from utils.merge_df import merge_price_saving_if_df
from utils.upload_data import update_latest_price_saving_if, save_raw_price_saving_if
from utils.pub_service import send_slack_message

def lambda_handler(event, _):
event_time_utc = event.get("time")
Expand All @@ -23,28 +15,28 @@ def lambda_handler(event, _):

# collect azure price data with multithreading
try:
current_df = collect_price_with_multithreading()
price_saving_df = collect_price_with_multithreading()
except Exception as e:
result_msg = """AZURE PRICE MODULE EXCEPTION!\n %s""" % (e)
data = {'text': result_msg}
send_slack_message(result_msg)
is_price_fetch_success = False

try:
eviction_df = load_if()
if_df = load_if()
except Exception as e:
result_msg = """AZURE IF MODULE EXCEPTION!\n %s""" % (e)
data = {'text': result_msg}
send_slack_message(result_msg)
is_if_fetch_success = False

if is_price_fetch_success and is_if_fetch_success:
join_df = merge_price_eviction_df(current_df, eviction_df)
join_df = merge_price_saving_if_df(price_saving_df, if_df)
elif not is_price_fetch_success and is_if_fetch_success:
join_df = eviction_df
join_df = if_df
elif is_price_fetch_success and not is_if_fetch_success:
current_df['IF'] = -1.0
current_df = current_df[
price_saving_df['IF'] = -1.0
current_df = price_saving_df[
['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']]
join_df = current_df
else:
Expand All @@ -54,25 +46,9 @@ def lambda_handler(event, _):
return

try:
# load previous dataframe
s3 = boto3.resource('s3')
object = s3.Object(STORAGE_CONST.BUCKET_NAME, AZURE_CONST.S3_LATEST_DATA_SAVE_PATH)
response = object.get()
data = json.load(response['Body'])
previous_df = pd.DataFrame(data)

# upload latest azure price to s3
update_latest(join_df, event_time_utc_datetime)
save_raw(join_df, event_time_utc_datetime)

# upload count-log to cloudwatch
upload_cloudwatch(join_df, event_time_utc_datetime)

# compare and upload changed_df to timestream
changed_df = compare(previous_df, join_df, AZURE_CONST.DF_WORKLOAD_COLS, AZURE_CONST.DF_FEATURE_COLS)
if not changed_df.empty:
query_selector(changed_df)
upload_timestream(changed_df, event_time_utc_datetime)
update_latest_price_saving_if(join_df, event_time_utc_datetime)
save_raw_price_saving_if(join_df, event_time_utc_datetime)

except Exception as e:
result_msg = """AZURE UPLOAD MODULE EXCEPTION!\n %s""" % (e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import load_sps
import pandas as pd
import traceback
from datetime import datetime
from sps_module import sps_shared_resources
from utils.merge_df import merge_price_eviction_sps_df
from utils.upload_data import update_latest_sps, save_raw_sps
from utils.pub_service import send_slack_message, logger, S3, AZURE_CONST
from utils.merge_df import merge_if_saving_price_sps_df
from utils.upload_data import update_latest, save_raw, upload_timestream, query_selector, upload_cloudwatch
from utils.compare_data import compare_sps
from utils.pub_service import send_slack_message, Logger, S3, AZURE_CONST

FIRST_TIME_ACTION = "First_Time" # 첫 실행 액션
EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션
UTC_1500_TIME = "15:00" # UTC 15:00 (KST 00:00)

def lambda_handler(event, _):
def lambda_handler(event, context):
action = event.get("action")
event_id = event.get("id")
log_stream_id = context.log_stream_name
event_time_utc = event.get("time")
event_time_utc_datetime = datetime.strptime(event_time_utc, "%Y-%m-%dT%H:%M:%SZ")

Expand All @@ -22,15 +24,15 @@ def lambda_handler(event, _):

desired_count = sps_shared_resources.time_desired_count_map.get(event_time_utc_datetime.strftime("%H:%M"), 1)

logger.info(f"Lambda triggered: action: {action}, event_time: {datetime.strftime(event_time_utc_datetime, '%Y-%m-%d %H:%M:%S')}, desired_count: {desired_count}")
Logger.info(f"Lambda triggered: action: {action}, event_time: {datetime.strftime(event_time_utc_datetime, '%Y-%m-%d %H:%M:%S')}, desired_count: {desired_count}")

if action == FIRST_TIME_ACTION:
sps_res_availability_zones_true_df, sps_res_availability_zones_false_df = load_sps.collect_spot_placement_score_first_time(desired_count=desired_count)

elif action == EVERY_10MIN_ACTION:
# UTC 15:00 (KST 00:00)인 경우 실행 건너뛰기
if event_time_utc_datetime.strftime("%H:%M") == UTC_1500_TIME:
logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)")
Logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)")
return handle_response(200, "Executed successfully. Scheduled time skipped.", action, event_time_utc_datetime)

sps_res_availability_zones_true_df, sps_res_availability_zones_false_df = load_sps.collect_spot_placement_score(desired_count=desired_count)
Expand All @@ -49,36 +51,87 @@ def lambda_handler(event, _):

except Exception as e:
error_msg = f"Unexpected error: {e}"
logger.error(error_msg)
send_slack_message(f"AZURE SPS MODULE EXCEPTION!\n{error_msg}\nEvent_id: {event_id}")
Logger.error(error_msg)
send_slack_message(f"LOCAL_TEST_AZURE SPS MODULE EXCEPTION!\n{error_msg}\Log_stream_id: {log_stream_id}")
return handle_response(500, "Execute Failed!", action, event_time_utc_datetime, str(e))


def handle_res_df(sps_res_true_df, sps_res_false_df, time_datetime):
try:
sps_res_true_df['time'] = time_datetime.strftime("%Y-%m-%d %H:%M:%S")
sps_res_false_df['time'] = time_datetime.strftime("%Y-%m-%d %H:%M:%S")
time_str = time_datetime.strftime("%Y-%m-%d %H:%M:%S")
sps_res_true_df['time'] = time_str
sps_res_false_df['time'] = time_str

sps_res_true_df['AvailabilityZone'] = sps_res_true_df['AvailabilityZone'].where(pd.notna(sps_res_true_df['AvailabilityZone']), None)

price_saving_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH, 'pkl.gz')
if price_saving_if_df is None:
raise ValueError("price_if_df is None")

price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz')
if price_if_df is None: raise ValueError("price_if_df is None")
success_availability_zone_true = process_zone_data(price_saving_if_df, sps_res_true_df, time_datetime, True)
success_availability_zone_false = process_zone_data(price_saving_if_df, sps_res_false_df, time_datetime, False)

price_eviction_sps_zone_true_df = merge_price_eviction_sps_df(price_if_df, sps_res_true_df, True)
success_availability_zones_true = (update_latest_sps(price_eviction_sps_zone_true_df, True)
and save_raw_sps(price_eviction_sps_zone_true_df, time_datetime, True))
if success_availability_zone_true and success_availability_zone_false:
Logger.info("Successfully merged the price/if/sps df, process_zone_data!")
return True
else:
Logger.info("Failed to merge the price/if/sps df, process_zone_data!")
return False

except Exception as e:
Logger.error(f"Error in handle_res_df function: {e}")
return False

price_eviction_sps_zone_false_df = merge_price_eviction_sps_df(price_if_df, sps_res_false_df, False)
success_availability_zones_false = (update_latest_sps(price_eviction_sps_zone_false_df, False)
and save_raw_sps(price_eviction_sps_zone_false_df, time_datetime, False))

if success_availability_zones_true and success_availability_zones_false:
logger.info(f"Successfully merged the price/if/sps df, updated latest results, and saved raw data!")
def process_zone_data(price_saving_if_df, sps_res_df, time_datetime, is_true_zone):
try:
all_data_zone_true_df = merge_if_saving_price_sps_df(price_saving_if_df, sps_res_df, is_true_zone)

if is_true_zone:
prev_availability_zone_true_all_data_df = S3.read_file(f"{AZURE_CONST.S3_LATEST_ALL_DATA_AVAILABILITY_ZONE_TRUE_PKL_GZIP_SAVE_PATH}", 'pkl.gz')
prev_availability_zone_true_all_data_df.drop(columns=['id'], inplace=True)
workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount']
feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time']

changed_df = None
if prev_availability_zone_true_all_data_df is not None and not prev_availability_zone_true_all_data_df.empty:
changed_df = compare_sps(prev_availability_zone_true_all_data_df, all_data_zone_true_df, workload_cols, feature_cols)

update_success = update_latest(all_data_zone_true_df, is_true_zone)
save_success = save_raw(all_data_zone_true_df, time_datetime, is_true_zone)
cloudwatch_success = upload_cloudwatch(all_data_zone_true_df, time_datetime)

if changed_df is not None and not changed_df.empty:
query_success = query_selector(changed_df)
timestream_success = upload_timestream(changed_df, time_datetime)
else:
query_success = True
timestream_success = True

success = all([update_success, save_success, cloudwatch_success, query_success, timestream_success])

log_details = (
f"update: {update_success}, save: {save_success}, cloudwatch: {cloudwatch_success}, "
f"query: {query_success}, timestream: {timestream_success}"
)
else:
update_success = update_latest(all_data_zone_true_df, is_true_zone)
save_success = save_raw(all_data_zone_true_df, time_datetime, is_true_zone)

success = update_success and save_success
log_details = f"update: {update_success}, save: {save_success}"

if not success:
Logger.error(f"Failed: Availability Zone {is_true_zone} Processing.")
Logger.error(log_details)
else:
return True

except Exception as e:
logger.error(f"Error in handle_res_df function: {e}")
return False
Logger.error(f"Error in process_zone_data function: {e}")
Logger.error(traceback.format_exc())
return False


def handle_response(status_code, body, action, time_datetime, error_message=None):
response = {
Expand All @@ -90,5 +143,4 @@ def handle_response(status_code, body, action, time_datetime, error_message=None
if error_message:
response["error_message"] = error_message

logger.info(f"Response: {response}")
return response
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import random
import requests
import traceback
import concurrent.futures
import time
import load_price
Expand Down Expand Up @@ -185,24 +186,26 @@ def execute_spot_placement_score_task_by_parameter_pool_df(api_calls_df, availab
"Score": score.get("score")
}
if availability_zones:
score_data["AvailabilityZone"] = score.get("availabilityZone", "SINGLE_ZONE")
score_data["AvailabilityZone"] = score.get("availabilityZone", "Single")

results.append(score_data)

elif result == "NO_AVAILABLE_LOCATIONS":
# NO_AVAILABLE_LOCATIONS인 경우 나머지 작업 취소
print("No available locations found. Cancelling remaining tasks.")
print(f"SS_Resources.locations_call_history_tmp: {SS_Resources.locations_call_history_tmp}")
for f in futures:
if not f.done():
f.cancel()
executor.shutdown(wait=False)

except JSONDecodeError as e:
print(
f"execute_spot_placement_score_task_by_parameter_pool_df func. JSON decoding error: {str(e)}")
print(f"execute_spot_placement_score_task_by_parameter_pool_df func. JSON decoding error: {str(e)}")
raise

except Exception as e:
print(
f"execute_spot_placement_score_task_by_parameter_pool_df func. An unexpected error occurred: {e}")
print(f"execute_spot_placement_score_task_by_parameter_pool_df func. An unexpected error occurred: {e}")
print(traceback.format_exc())
raise
finally:
save_tmp_files_to_s3()
Expand Down Expand Up @@ -245,7 +248,6 @@ def execute_spot_placement_score_api(region_chunk, instance_type_chunk, availabi
with SS_Resources.location_lock:
res = SL_Manager.get_next_available_location()
if res is None:
print("No available locations with remaining calls.")
return "NO_AVAILABLE_LOCATIONS"
subscription_id, location, history, over_limit_locations = res
SL_Manager.update_call_history(subscription_id, location, history)
Expand All @@ -256,7 +258,7 @@ def execute_spot_placement_score_api(region_chunk, instance_type_chunk, availabi
"Content-Type": "application/json",
}
try:
response = requests.post(url, headers=headers, json=request_body, timeout=35)
response = requests.post(url, headers=headers, json=request_body, timeout=40)
response.raise_for_status()
return response.json()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def validation_can_call(location, history, over_limit_locations):
이 메서드는 지정된 location으로 호출 가능한지 확인합니다.
초과 요청 여부와 호출 이력의 크기를 기준으로 판단합니다.
"""
if over_limit_locations is not None:
if over_limit_locations:
if ((location not in over_limit_locations)
and (len(history[location]) < 10)):
return True
Expand Down Expand Up @@ -106,7 +106,6 @@ def get_next_available_location():
location = locations[location_index]

if validation_can_call(location, current_history, current_over_limit_locations):

SS_Resources.last_subscription_id_and_location_tmp['last_subscription_id'] = subscription_id
SS_Resources.last_subscription_id_and_location_tmp['last_location'] = location
return subscription_id, location, current_history, current_over_limit_locations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import requests
import time
from azure.identity import ClientSecretCredential
from utils.pub_service import db_AzureAuth
from utils.pub_service import DB_AzureAuth
from azure.core.exceptions import ClientAuthenticationError

def get_token():
db = db_AzureAuth
db = DB_AzureAuth

now = int(time.time())

Expand Down
Loading

0 comments on commit 5e313b7

Please sign in to comment.