Skip to content

Commit

Permalink
Merge pull request #512 from ddps-lab/azure-sps-for-PR
Browse files Browse the repository at this point in the history
SPS 수집 기능 모두 포함한 PR - to main branch
  • Loading branch information
krtaiyang authored Feb 11, 2025
2 parents 3ab6bb9 + 32a780a commit 5d9a68c
Show file tree
Hide file tree
Showing 21 changed files with 1,408 additions and 117 deletions.
29 changes: 21 additions & 8 deletions .github/workflows/azure-lambda-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ on:
branches:
- 'main'
paths:
- 'collector/spot-dataset/azure/lambda/current_collector/**'
- 'utility/slack_msg_sender.py'
- 'collector/spot-dataset/azure/lambda/current_collector/lambda_function.py'
- 'collector/spot-dataset/azure/lambda/current_collector/load_price.py'
- 'collector/spot-dataset/azure/lambda/current_collector/load_if.py'
- 'const_config.py'
- 'collector/spot-dataset/azure/lambda/current_collector/utils/**'

env:
AWS_ACCESS_KEY_ID: ${{ secrets.SPOTRANK_ACCESS_KEY_ID }}
Expand All @@ -19,14 +21,25 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: checkout source code
uses: actions/checkout@v1
- name: Zip lambda_function code
uses: actions/checkout@v4
- name: Zip Lambda function code
run: |
zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_lambda.zip ./collector/spot-dataset/azure/lambda/current_collector/* ./const_config.py ./utility/slack_msg_sender.py
rm -f azure_lambda.zip
rm -f ./collector/spot-dataset/azure/lambda/current_collector/azure_lambda.zip
zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_lambda.zip \
./collector/spot-dataset/azure/lambda/current_collector/lambda_function.py \
./collector/spot-dataset/azure/lambda/current_collector/load_price.py \
./collector/spot-dataset/azure/lambda/current_collector/load_if.py \
./const_config.py
cd ./collector/spot-dataset/azure/lambda/current_collector/
zip -r azure_lambda.zip ./utill/*
zip -r azure_lambda.zip ./utils/*
cd ../../../../../
mv ./collector/spot-dataset/azure/lambda/current_collector/azure_lambda.zip ./
- name: Deploy to lambda
- name: Deploy to AWS Lambda
run: |
aws lambda update-function-code --function-name azure-collector --zip-file fileb://azure_lambda.zip
aws lambda update-function-code --function-name azure-collector --zip-file fileb://azure_lambda.zip
45 changes: 45 additions & 0 deletions .github/workflows/azure-sps-lambda-sync.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: deploy azure sps files to lambda
on:
push:
branches:
- 'main'
paths:
- 'collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py'
- 'collector/spot-dataset/azure/lambda/current_collector/load_price.py'
- 'collector/spot-dataset/azure/lambda/current_collector/load_sps.py'
- 'const_config.py'
- 'collector/spot-dataset/azure/lambda/current_collector/utils/**'
- 'collector/spot-dataset/azure/lambda/current_collector/sps_module/**'
env:
AWS_ACCESS_KEY_ID: ${{ secrets.SPOTRANK_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.SPOTRANK_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: 'us-west-2'

jobs:
deploy_source:
name: deploy lambda from source
runs-on: ubuntu-latest
steps:
- name: checkout source code
uses: actions/checkout@v4
- name: Zip Lambda function code
run: |
rm -f azure_sps_lambda.zip
rm -f ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip
zip -j ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip \
./collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py \
./collector/spot-dataset/azure/lambda/current_collector/load_price.py \
./collector/spot-dataset/azure/lambda/current_collector/load_sps.py \
./const_config.py
cd ./collector/spot-dataset/azure/lambda/current_collector/
zip -r azure_sps_lambda.zip ./utils/*
zip -r azure_sps_lambda.zip ./sps_module/*
cd ../../../../../
mv ./collector/spot-dataset/azure/lambda/current_collector/azure_sps_lambda.zip ./
- name: Deploy to AWS Lambda
run: |
aws lambda update-function-code --function-name azure-sps-collector --zip-file fileb://azure_sps_lambda.zip
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import os
import json
import boto3
import slack_msg_sender
import pandas as pd
from const_config import AzureCollector, Storage
from datetime import datetime, timezone
from load_if import load_if
from merge_df import merge_df
from load_price import collect_price_with_multithreading
from upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch
from compare_data import compare
from utils.merge_df import merge_price_eviction_df
from utils.compare_data import compare
from utils.upload_data import upload_timestream, update_latest, save_raw, query_selector, upload_cloudwatch
from utils.pub_service import send_slack_message, AZURE_CONST, STORAGE_CONST

STORAGE_CONST = Storage()
AZURE_CONST = AzureCollector()

BUCKET_NAME = os.environ.get('BUCKET_NAME')
KEY = os.environ.get('S3_LATEST_DATA_SAVE_PATH')
WORKLOAD_COLS = ['InstanceTier', 'InstanceType', 'Region']
FEATURE_COLS = ['OndemandPrice', 'SpotPrice', 'IF']
str_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M")
Expand All @@ -25,38 +19,39 @@
def azure_collector(timestamp):
is_price_fetch_success = True
is_if_fetch_success = True

# collect azure price data with multithreading
try:
current_df = collect_price_with_multithreading()
except Exception as e:
result_msg = """AZURE PRICE MODULE EXCEPTION!\n %s""" % (e)
data = {'text': result_msg}
slack_msg_sender.send_slack_message(result_msg)
send_slack_message(result_msg)
is_price_fetch_success = False

try:
eviction_df = load_if()
except Exception as e:
result_msg = """AZURE IF MODULE EXCEPTION!\n %s""" % (e)
data = {'text': result_msg}
slack_msg_sender.send_slack_message(result_msg)
send_slack_message(result_msg)
is_if_fetch_success = False

if is_price_fetch_success and is_if_fetch_success:
join_df = merge_df(current_df, eviction_df)
join_df = merge_price_eviction_df(current_df, eviction_df)
elif not is_price_fetch_success and is_if_fetch_success:
join_df = eviction_df
elif is_price_fetch_success and not is_if_fetch_success:
current_df['IF'] = -1.0
current_df = current_df[['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']]
current_df = current_df[
['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']]
join_df = current_df
else:
result_msg = """AZURE PRICE MODULE AND IF MODULE EXCEPTION!"""
data = {'text': result_msg}
slack_msg_sender.send_slack_message(result_msg)
send_slack_message(result_msg)
return

try:
# load previous dataframe
s3 = boto3.resource('s3')
Expand All @@ -81,9 +76,10 @@ def azure_collector(timestamp):
except Exception as e:
result_msg = """AZURE UPLOAD MODULE EXCEPTION!\n %s""" % (e)
data = {'text': result_msg}
slack_msg_sender.send_slack_message(result_msg)
send_slack_message(result_msg)
if_exception_flag = False


def lambda_handler(event, context):
azure_collector(timestamp)
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import load_sps
import pandas as pd
from datetime import datetime
from sps_module import sps_shared_resources
from utils.merge_df import merge_price_eviction_sps_df
from utils.upload_data import update_latest_sps, save_raw_sps
from utils.pub_service import send_slack_message, logger, S3, AZURE_CONST

FIRST_TIME_ACTION = "First_Time" # 첫 실행 액션
EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션
UTC_1500_TIME = "15:00" # UTC 15:00 (KST 00:00)

def lambda_handler(event, _):
action = event.get("action")
event_time_utc = event.get("time")
try:
if not action or not event_time_utc:
raise ValueError("Invalid event info: action or time is missing")

event_time_utc = datetime.strptime(event_time_utc, "%Y-%m-%dT%H:%M:%SZ")
desired_count = sps_shared_resources.time_desired_count_map.get(event_time_utc.strftime("%H:%M"), 1)

logger.info(f"Lambda triggered: action: {action}, event_time: {event_time_utc}, desired_count: {desired_count}")

if action == FIRST_TIME_ACTION:
sps_res_df = load_sps.collect_spot_placement_score_first_time(desired_count=desired_count)

elif action == EVERY_10MIN_ACTION:
# UTC 15:00 (KST 00:00)인 경우 실행 건너뛰기
if event_time_utc.strftime("%H:%M") == UTC_1500_TIME:
logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)")
return handle_response(200, "Executed successfully. Scheduled time skipped.", action, event_time_utc)

sps_res_df = load_sps.collect_spot_placement_score(desired_count=desired_count)

else:
raise ValueError(f"Invalid lambda action.")


if sps_res_df is None: raise ValueError("sps_res_df is None")

if not handle_res_df(sps_res_df, event_time_utc):
raise RuntimeError("Failed to handle_res_df")

return handle_response(200, "Executed Successfully!", action, event_time_utc)

except Exception as e:
error_msg = f"Unexpected error: {e}"
logger.error(error_msg)
send_slack_message(f"AZURE SPS MODULE EXCEPTION!\n{error_msg}")
return handle_response(500, "Execute Failed!", action, event_time_utc, str(e))


def handle_res_df(sps_res_df, event_time_utc):
try:
sps_res_df['time'] = event_time_utc.strftime("%Y-%m-%d %H:%M:%S")
sps_res_df['AvailabilityZone'] = sps_res_df['AvailabilityZone'].where(pd.notna(sps_res_df['AvailabilityZone']), None)

# price_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_IF_GZIP_SAVE_PATH, 'pkl.gz')
# if price_if_df is None: raise ValueError("price_if_df is None")
price_if_df = pd.DataFrame(S3.read_file(AZURE_CONST.S3_LATEST_DATA_SAVE_PATH, 'json'))
price_eviction_sps_df = merge_price_eviction_sps_df(price_if_df, sps_res_df)

if update_latest_sps(price_eviction_sps_df) and save_raw_sps(price_eviction_sps_df, event_time_utc):
logger.info(f"Successfully merge the price/if/sps df, and update_latest_result, save_raw!")
return True

except Exception as e:
logger.error(f"Error in handle_res_df function: {e}")
return False

def handle_response(status_code, body, action, time, error_message=None):
response = {
"statusCode": status_code,
"body": body,
"action": action,
"time": str(time)
}
if error_message:
response["error_message"] = error_message

logger.info(f"Response: {response}")
return response
21 changes: 10 additions & 11 deletions collector/spot-dataset/azure/lambda/current_collector/load_if.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import json
import boto3
import requests
import pandas as pd
import slack_msg_sender
from utill.azure_auth import get_token
from utils.azure_auth import get_token
from utils.pub_service import send_slack_message


def get_data(token, skip_token, retry=3):
Expand Down Expand Up @@ -50,26 +48,27 @@ def load_if():
break
skip_token = data["$skipToken"]

eviction_df = pd.DataFrame(datas)
eviction_df = pd.DataFrame(datas)

eviction_df['InstanceTier'] = eviction_df['skuName'].str.split('_', n=1, expand=True)[0].str.capitalize()
eviction_df['InstanceType'] = eviction_df['skuName'].str.split('_', n=1, expand=True)[1].str.capitalize()

frequency_map = {'0-5': 3.0, '5-10': 2.5, '10-15': 2.0, '15-20': 1.5, '20+': 1.0}
eviction_df = eviction_df.replace({'evictionRate': frequency_map})

eviction_df.rename(columns={'evictionRate': 'IF'}, inplace=True)
eviction_df.rename(columns={'location': 'Region'}, inplace=True)

eviction_df['OndemandPrice'] = -1.0
eviction_df['SpotPrice'] = -1.0
eviction_df['Savings'] = 1.0

eviction_df = eviction_df[['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']]

eviction_df = eviction_df[
['InstanceTier', 'InstanceType', 'Region', 'OndemandPrice', 'SpotPrice', 'Savings', 'IF']]

return eviction_df

except Exception as e:
result_msg = """AZURE Exception when load_if\n %s""" % (e)
data = {'text': result_msg}
slack_msg_sender.send_slack_message(result_msg)
send_slack_message(result_msg)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
import pandas as pd
import numpy as np
import threading
from const_config import AzureCollector
from concurrent.futures import ThreadPoolExecutor
import slack_msg_sender

AZURE_CONST = AzureCollector()

from utils.pub_service import send_slack_message, AZURE_CONST

price_list = []
response_dict = {}
Expand Down Expand Up @@ -133,10 +129,10 @@ def collect_price_with_multithreading():

if response_dict:
for i in response_dict:
slack_msg_sender.send_slack_message(f"{i} respones occurred {response_dict[i]} times")
send_slack_message(f"{i} respones occurred {response_dict[i]} times")

price_df = pd.DataFrame(price_list)
savings_df = preprocessing_price(price_df)
savings_df = savings_df.drop_duplicates(subset=['InstanceTier', 'InstanceType', 'Region'], keep='first')

return savings_df
return savings_df
Loading

0 comments on commit 5d9a68c

Please sign in to comment.