Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 준비 필요한 사전 main pull #534

Merged
merged 50 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
eccf82b
Feat sps collect
ty-kmu Feb 11, 2025
6370705
Feat post processing
ty-kmu Feb 11, 2025
39490f8
Feat workload
ty-kmu Feb 11, 2025
9c530be
Fix boto3 session
ty-kmu Feb 11, 2025
123ad25
Fix path
ty-kmu Feb 11, 2025
f8d6858
Fix: path issue
ty-kmu Feb 11, 2025
a127a6b
Fix: path
ty-kmu Feb 11, 2025
d1a1e99
Temp comment out requests.post in slack_msg_sender
ty-kmu Feb 11, 2025
194abfc
Merge pull request #517 from ddps-lab/azure-collector-fix
krtaiyang Feb 11, 2025
7b27cb0
Fix path
ty-kmu Feb 11, 2025
c18c037
Change test s3
ty-kmu Feb 11, 2025
3c65573
Fix shell script
ty-kmu Feb 11, 2025
be8edd9
Fix tmp break
ty-kmu Feb 12, 2025
b6639e7
Fix enable message sending
ty-kmu Feb 12, 2025
f8c7578
Refactor const
ty-kmu Feb 12, 2025
ac193f2
Format unnecessary whitespace & Update script permissions
ty-kmu Feb 12, 2025
45111f2
Merge pull request #518 from ddps-lab/azure-collector-fix
krtaiyang Feb 12, 2025
2c6c68a
Refactor time handling and del config file logic
ty-kmu Feb 12, 2025
7b4ced4
Merge pull request #519 from ddps-lab/azure-collector-fix
krtaiyang Feb 12, 2025
bcc8f25
Refactor file saving and uploading logic in collect_sps.py
ty-kmu Feb 12, 2025
7a17c0e
Refactor readability and error handling
ty-kmu Feb 12, 2025
ca4f237
Refactor compare_max_instance function to simplify parameters and logic
ty-kmu Feb 12, 2025
70acb2f
Add migration code
ty-kmu Feb 12, 2025
f952ab7
Fix hot fix
ty-kmu Feb 12, 2025
0dcc682
Fix id
ty-kmu Feb 13, 2025
1e09e1f
Fix AWS table name to use environment variable
ty-kmu Feb 13, 2025
84fcea4
Add CI/CD
ty-kmu Feb 13, 2025
e30c7b9
Fix timestamp
ty-kmu Feb 13, 2025
9ca9589
Change AWS Lambda function
ty-kmu Feb 13, 2025
eded5a8
Merge pull request #523 from ddps-lab/aws/new-sps-collector
ty-kmu Feb 13, 2025
d82e292
Refactor folder struct
ty-kmu Feb 13, 2025
bdba06c
Merge pull request #524 from ddps-lab/aws/new-sps-collector
ty-kmu Feb 13, 2025
bfa5389
Fix func name
ty-kmu Feb 13, 2025
b073620
Merge branch 'aws/new-sps-collector'
ty-kmu Feb 13, 2025
f495e86
Fix CI/CD
ty-kmu Feb 13, 2025
db0ceb2
Merge pull request #526 from ddps-lab/azure-collector-fix
krtaiyang Feb 13, 2025
e34694b
Refactor error boundary & delete unused code
ty-kmu Feb 13, 2025
05e30de
Fix typo
ty-kmu Feb 13, 2025
acd45c6
Update build
ty-kmu Feb 13, 2025
8d44b9b
Fix minor path issue
ty-kmu Feb 13, 2025
54c3426
Merge pull request #527 from ddps-lab/frontend
ty-kmu Feb 13, 2025
f97007b
Fix t2, t3 logic
ty-kmu Feb 14, 2025
659a6c0
Merge pull request #528 from ddps-lab/azure-collector-fix
krtaiyang Feb 14, 2025
c837123
Merge pull request #529 from ddps-lab/azure-collector-fix
krtaiyang Feb 14, 2025
bf1e609
Merge pull request #530 from ddps-lab/azure-collector-fix
krtaiyang Feb 15, 2025
5bf07b0
Feat T2, T3 column display
ty-kmu Feb 24, 2025
2f67d1d
Feat integrate snackbar notifications for query validation and errors
ty-kmu Feb 24, 2025
133f2ec
Build apply
ty-kmu Feb 24, 2025
31b02e8
Style del comments
ty-kmu Feb 24, 2025
620c0e2
Merge pull request #533 from ddps-lab/aws/t2t3-front
ty-kmu Feb 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/aws-lambda-sync.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: deploy AWS files to lambda
on:
push:
branches:
- "main"
paths:
- "collector/spot-dataset/aws/lambda/**"
- "utility/slack_msg_sender.py"
workflow_dispatch:

env:
AWS_ACCESS_KEY_ID: ${{ secrets.SPOTRANK_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.SPOTRANK_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: "us-west-2"

jobs:
deploy_source:
name: deploy lambda from source
runs-on: ubuntu-latest
steps:
- name: Checkout source code
uses: actions/checkout@v4

- name: Zip lambda function code
run: |
set -e

declare -A LAMBDA_PATHS=(
["ondemand_price_collector_lambda.zip"]="collector/spot-dataset/aws/lambda/ondemand_price"
["spotlake_post_processing_lambda.zip"]="collector/spot-dataset/aws/lambda/post_processing_data"
["spot_if_collector_lambda.zip"]="collector/spot-dataset/aws/lambda/spot_if"
["spot_price_collector_lambda.zip"]="collector/spot-dataset/aws/lambda/spot_price"
["binpacked_workloads_generator_lambda.zip"]="collector/spot-dataset/aws/lambda/workload"
)

for zip_name in "${!LAMBDA_PATHS[@]}"; do
echo "Creating ZIP: $zip_name"
zip -j "$zip_name" utility/slack_msg_sender.py
zip -j "$zip_name" "${LAMBDA_PATHS[$zip_name]}"/*
done

- name: Deploy to AWS Lambda
run: |
set -e

declare -A LAMBDA_FUNCTIONS=(
["ondemand_price_collector"]="ondemand_price_collector_lambda.zip"
["spotlake_post_processing"]="spotlake_post_processing_lambda.zip"
["spot_if_collector"]="spot_if_collector_lambda.zip"
["spot_price_collector"]="spot_price_collector_lambda.zip"
["binpacked_workloads_generator"]="binpacked_workloads_generator_lambda.zip"
)

for function_name in "${!LAMBDA_FUNCTIONS[@]}"; do
echo "Deploying: $function_name"
aws lambda update-function-code --function-name "$function_name" --zip-file "fileb://${LAMBDA_FUNCTIONS[$function_name]}"
done
235 changes: 235 additions & 0 deletions collector/spot-dataset/aws/ec2/sps/collect_sps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# ------ import module ------
from datetime import datetime, timezone
import boto3.session, botocore
import sys, os, argparse
import pickle, gzip, json
import pandas as pd
from io import StringIO

# ------ import user module ------
# memo: change the path
sys.path.append("/home/ubuntu/spotlake/utility")
from slack_msg_sender import send_slack_message
from sps_query_api import query_sps

def main():
# ------ Setting Constants ------
CURRENT_LOCAL_BASE_PATH = "/home/ubuntu/spotlake/collector/spot-dataset/aws/ec2/sps"
BUCKET_NAME = "spotlake"
WORKLOAD_BASE_PATH = "rawdata/aws/workloads"
SPS_BASE_PATH = "rawdata/aws/sps"
CREDENTIAL_FILE_PATH = "credential/credential_3699.csv"
LOG_GROUP_NAME = "SPS-Server-Data-Count"
LOG_STREAM_NAME = "aws"

# ------ Setting Client ------
session = boto3.session.Session()
s3 = session.resource("s3")
s3_client = session.client("s3", region_name="us-west-2")

# ------ Create Index Files ------
CREDENTIAL_START_INDEX_FILE_NAME = f"{CURRENT_LOCAL_BASE_PATH}/credential_index.txt"
if not os.path.exists(CREDENTIAL_START_INDEX_FILE_NAME):
with open(CREDENTIAL_START_INDEX_FILE_NAME, 'w') as file:
file.write('0\n0')
TARGET_CAPACITY_INDEX_FILE_NAME = f"{CURRENT_LOCAL_BASE_PATH}/target_capacity_index.txt"
if not os.path.exists(TARGET_CAPACITY_INDEX_FILE_NAME):
with open(TARGET_CAPACITY_INDEX_FILE_NAME, 'w') as file:
file.write('0\n0')

# ------ Receive UTC Time Data ------
parser = argparse.ArgumentParser()
parser.add_argument('--timestamp', dest='timestamp', action='store')
args = parser.parse_args()
timestamp_utc = datetime.strptime(args.timestamp, "%Y-%m-%dT%H:%M")

print(f"스크립트 실행 시작 시간 (UTC) : {timestamp_utc}")

# ------ Modify Date Data Format ------
date = args.timestamp.split("T")[0]
timestamp_utc = timestamp_utc.replace(minute=((timestamp_utc.minute // 10) * 10), second=0)
S3_DIR_NAME = timestamp_utc.strftime("%Y/%m/%d")
S3_OBJECT_PREFIX = timestamp_utc.strftime("%H-%M")
execution_time_start = datetime.now(timezone.utc)

# ------ Save Value of Credential Start Index ------
with open(CREDENTIAL_START_INDEX_FILE_NAME, 'r') as f:
init_credential_index, current_credential_index = map(int, f.readlines())

# ------ Set Target Capacities ------
target_capacities = [1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]
with open(TARGET_CAPACITY_INDEX_FILE_NAME, 'r') as f:
init_target_capacity_index, target_capacity_index = map(int, f.readlines())
target_capacity_index = target_capacity_index % len(target_capacities)
target_capacity = target_capacities[target_capacity_index]

# ------ Load Workload File -------
start_time = datetime.now(timezone.utc)
workload = None
try:
key = f"{WORKLOAD_BASE_PATH}/{S3_DIR_NAME}/binpacked_workloads.pkl.gz"
workload = pickle.load(gzip.open(s3.Object(BUCKET_NAME, key).get()["Body"]))

local_workload_path = f"{CURRENT_LOCAL_BASE_PATH}/{date}_binpacked_workloads.pkl.gz"

# workload파일을 새로 받았다면 다운로드
if not os.path.exists(local_workload_path):
for filename in os.listdir(f"{CURRENT_LOCAL_BASE_PATH}"):
if "_binpacked_workloads.pkl.gz" in filename:
os.remove(f"{CURRENT_LOCAL_BASE_PATH}/{filename}")

s3_client.download_file(BUCKET_NAME, key, local_workload_path)
# workload 파일이 바뀌었으므로 계정 묶음 change
init_credential_index = 1800 if init_credential_index == 0 else 0
with open(CREDENTIAL_START_INDEX_FILE_NAME, 'w') as f:
f.write(f"{str(init_credential_index)}\n{str(init_credential_index)}")
# workload 파일이 바뀌었으므로 index location save
init_target_capacity_index = target_capacity_index
with open(TARGET_CAPACITY_INDEX_FILE_NAME, 'w') as f:
f.write(f"{str(init_target_capacity_index)}\n{str(init_target_capacity_index)}")
except Exception as e:
message = f"bucket : {BUCKET_NAME}, object : {key} 가 수집되지 않았습니다.\n서버에 있는 로컬 workload파일을 불러옵니다."
send_slack_message(message)
print(message)
is_local = False
for filename in os.listdir(f"{CURRENT_LOCAL_BASE_PATH}"):
if "_binpacked_workloads.pkl.gz" in filename:
print(f"로컬 워크로드 파일 {CURRENT_LOCAL_BASE_PATH}/{filename} 사용")
with open(f"{CURRENT_LOCAL_BASE_PATH}/{filename}", 'rb') as f:
workload = pickle.load(gzip.open(f))
is_local = True
break
if not is_local:
message = f"로컬파일에 workload파일이 존재하지 않습니다."
send_slack_message(message)
print(message)
raise Exception("does not exist local workloads file")
print(f"계정 시작 인덱스 : {current_credential_index}")

# ------ Load Credential File ------
credentials = None
try:
csv_content = s3.Object(BUCKET_NAME, CREDENTIAL_FILE_PATH).get()["Body"].read().decode('utf-8')
credentials = pd.read_csv(StringIO(csv_content))
except Exception as e:
send_slack_message(e)
print(e)
raise e

end_time = datetime.now(timezone.utc)
print(f"Load credential and workload time : {(end_time - start_time).total_seconds():.4f} ms")

# ------ Start Query Per Target Capacity ------
start_time = datetime.now(timezone.utc)
start_credential_index = current_credential_index

try:
df_list = []
for scenarios in workload:
while True:
try:
args = (credentials.iloc[current_credential_index], scenarios, target_capacity)
current_credential_index += 1
df = query_sps(args)
df_list.append(df)
break
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'MaxConfigLimitExceeded':
continue
else:
send_slack_message(e)
print(e)
raise e
except Exception as e:
send_slack_message(e)
print(e)
raise e

sps_df = pd.concat(df_list, axis=0, ignore_index=True)
except Exception as e:
message = f"error at query_sps\nerror : {e}"
send_slack_message(message)
print(message)
raise e

# ------ Update config files ------
next_target_capacity_index = (target_capacity_index + 1) % len(target_capacities)
print(next_target_capacity_index)
if next_target_capacity_index == init_target_capacity_index:
with open(CREDENTIAL_START_INDEX_FILE_NAME, "w") as f:
f.write(f"{str(init_credential_index)}\n{str(init_credential_index)}")
else:
with open(CREDENTIAL_START_INDEX_FILE_NAME, "w") as f:
f.write(f"{str(init_credential_index)}\n{str(current_credential_index)}")
with open(TARGET_CAPACITY_INDEX_FILE_NAME, "w") as f:
f.write(f"{str(init_target_capacity_index)}\n{str(next_target_capacity_index)}")

end_time = datetime.now(timezone.utc)
print(f"Target Capacity {target_capacity} query time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
print(f"사용한 credential range : {(start_credential_index, current_credential_index)}")

start_time = datetime.now(timezone.utc)
# ------ Save Dataframe File ------
try:
object_name = f"{S3_OBJECT_PREFIX}_sps_{target_capacity}.pkl"
saved_filename = f"{CURRENT_LOCAL_BASE_PATH}/{object_name}"
gz_filename = f"{saved_filename}.gz"

with open(saved_filename, "wb") as f:
pickle.dump(sps_df, f)

with open(saved_filename, "rb") as f_in, gzip.open(gz_filename, "wb") as f_out:
f_out.writelines(f_in)

with open(gz_filename, "rb") as f:
s3_client.upload_fileobj(f, BUCKET_NAME, f"{SPS_BASE_PATH}/{S3_DIR_NAME}/{S3_OBJECT_PREFIX}_sps_{target_capacity}.pkl.gz")

os.remove(saved_filename)
os.remove(gz_filename)

except Exception as e:
send_slack_message(e)
print(f"파일 저장 및 업로드 중 오류 발생: {e}")
raise e
end_time = datetime.now(timezone.utc)
print(f"Saving time of DF File is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")

# ------ Monitoring for total execution time ------
execution_time_end = datetime.now(timezone.utc)
total_execution_time = (execution_time_end - execution_time_start).total_seconds()
if total_execution_time >= 600000:
message = f"sps 쿼리 시간이 10분을 초과하였습니다 : {total_execution_time} ms"
message += f"\n실행 시작 시간 (UTC) : {timestamp_utc}"
send_slack_message(message)
print(message)

# ------ Upload Collecting Data Number at Cloud Logs ------
log_client = session.client('logs', 'us-west-2')
# memo: change the log group name
try:
message = json.dumps({"MUMBER_ROWS" : sps_df.shape[0]})
timestamp = int(datetime.now(timezone.utc).timestamp() * 1000)
try:
response = log_client.put_log_events(
logGroupName=LOG_GROUP_NAME,
logStreamName=LOG_STREAM_NAME,
logEvents=[
{
'timestamp' : timestamp,
'message' : message
},
],
)
except Exception as e:
print(e)
raise e
except Exception as e:
print(e)
raise e
print(f"수집된 DataFrame 행 수 : {sps_df.shape[0]}")

if __name__ == "__main__":
start_time = datetime.now(timezone.utc)
main()
end_time = datetime.now(timezone.utc)
print(f"Running time is {(end_time - start_time).total_seconds() * 1000 / 60000:.2f} min")
15 changes: 15 additions & 0 deletions collector/spot-dataset/aws/ec2/sps/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
absl-py
boto3
botocore
immutabledict
jmespath
numpy
ortools
pandas
protobuf
python-dateutil
pytz
s3transfer
six
tzdata
urllib3
Loading