-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathmain.py
159 lines (124 loc) · 5.72 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# This must be first
from dotenv import load_dotenv
load_dotenv(".env")
import sys
import os
import pickle
import time
from typing import Optional
from transformers import PreTrainedModel, PreTrainedTokenizer
from scripts.utils import MAIN_RESULTS_DIR, main_experiment_results_dir
from core.data.task_helpers import get_all_tasks, get_task_by_name
from core.models.llm_loading import load_model_and_tokenizer
from core.models.utils.inference import hidden_to_logits
from core.analysis.utils import logits_top_tokens
from core.analysis.evaluation import calculate_accuracy_on_datasets
from core.task_vectors import run_icl, run_task_vector
from core.utils.misc import limit_gpus, seed_everything
from core.experiments_config import MODELS_TO_EVALUATE, TASKS_TO_EVALUATE
def get_results_file_path(model_type: str, model_variant: str, experiment_id: str = "") -> str:
return os.path.join(main_experiment_results_dir(experiment_id), f"{model_type}_{model_variant}.pkl")
def evaluate_task(model: PreTrainedModel, tokenizer: PreTrainedTokenizer, task_name: str, num_examples: int) -> None:
seed_everything(41)
accuracies = {}
task = get_task_by_name(tokenizer=tokenizer, task_name=task_name)
# Evaluate baseline
baseline_datasets = task.create_datasets(num_datasets=100, num_examples=0)
predictions = run_icl(model, tokenizer, task, baseline_datasets, include_train=False)
accuracies["baseline"] = calculate_accuracy_on_datasets(task, predictions, baseline_datasets)
# Evaluate ICL and Task Vector
# TODO: Change back to 400, 100
# num_test_datasets, num_dev_datasets = 400, 100
num_test_datasets, num_dev_datasets = 50, 50
test_datasets = task.create_datasets(num_datasets=num_test_datasets, num_examples=num_examples)
dev_datasets = task.create_datasets(num_datasets=num_dev_datasets, num_examples=num_examples)
icl_predictions = run_icl(model, tokenizer, task, test_datasets)
tv_predictions, tv_dev_accuracy_by_layer, task_hiddens = run_task_vector(
model,
tokenizer,
task,
test_datasets,
dev_datasets,
)
accuracies["tv_dev_by_layer"] = tv_dev_accuracy_by_layer
accuracies["icl"] = calculate_accuracy_on_datasets(task, icl_predictions, test_datasets)
accuracies["tv"] = calculate_accuracy_on_datasets(task, tv_predictions, test_datasets)
tv_ordered_tokens_by_layer = {}
try:
for layer_num in tv_dev_accuracy_by_layer.keys():
task_hidden = task_hiddens.mean(axis=0)[layer_num]
logits = hidden_to_logits(model, task_hidden)
tv_ordered_tokens_by_layer[layer_num] = logits_top_tokens(logits, tokenizer, k=100)
except Exception as e:
print("Error:", e)
return accuracies, tv_ordered_tokens_by_layer
def run_main_experiment(
model_type: str,
model_variant: str,
experiment_id: str = "",
model: Optional[PreTrainedModel] = None,
tokenizer: Optional[PreTrainedTokenizer] = None,
) -> None:
print("Evaluating model:", model_type, model_variant)
results_file = get_results_file_path(model_type, model_variant, experiment_id=experiment_id)
os.makedirs(os.path.dirname(results_file), exist_ok=True)
if os.path.exists(results_file):
with open(results_file, "rb") as f:
results = pickle.load(f)
else:
results = {}
limit_gpus(range(0, 8))
print("Loading model and tokenizer...")
if model is None or tokenizer is None:
model, tokenizer = load_model_and_tokenizer(model_type, model_variant)
print("Loaded model and tokenizer.")
tasks = get_all_tasks(tokenizer=tokenizer)
num_examples = 5
for i, task_name in enumerate(TASKS_TO_EVALUATE):
task = tasks[task_name]
if task_name in results:
print(f"Skipping task {i+1}/{len(tasks)}: {task_name}")
continue
results[task_name] = {}
print("\n" + "=" * 50)
print(f"Running task {i+1}/{len(tasks)}: {task_name}")
tic = time.time()
accuracies, tv_ordered_tokens_by_layer = evaluate_task(model, tokenizer, task_name, num_examples)
print(f"Baseline Accuracy: {accuracies['baseline']:.2f}")
print(f"ICL Accuracy: {accuracies['icl']:.2f}")
print(f"Task Vector Accuracy: {accuracies['tv']:.2f}")
print(f"Dev Accuracy by layer: ", end="")
for layer, accuracy in accuracies["tv_dev_by_layer"].items():
print(f"{layer}: {accuracy:.2f}, ", end="")
print()
print("Time:", time.time() - tic)
results[task_name] = {
"baseline_accuracy": accuracies["baseline"],
"num_examples": num_examples,
"icl_accuracy": accuracies["icl"],
"tv_accuracy": accuracies["tv"],
"tv_dev_accruacy_by_layer": accuracies["tv_dev_by_layer"],
"tv_ordered_tokens_by_layer": tv_ordered_tokens_by_layer,
}
with open(results_file, "wb") as f:
pickle.dump(results, f)
def get_new_experiment_id() -> str:
return str(
max([int(results_dir) for results_dir in os.listdir(MAIN_RESULTS_DIR) if results_dir.isdigit()] + [0]) + 1
)
def main():
if len(sys.argv) == 1:
# Run all models
# Calculate the experiment_id as the max experiment_id + 1
experiment_id = get_new_experiment_id()
for model_type, model_variant in MODELS_TO_EVALUATE:
run_main_experiment(model_type, model_variant, experiment_id=experiment_id)
else:
if len(sys.argv) == 2:
model_num = int(sys.argv[1])
model_type, model_variant = MODELS_TO_EVALUATE[model_num]
elif len(sys.argv) == 3:
model_type, model_variant = sys.argv[1:]
run_main_experiment(model_type, model_variant)
if __name__ == "__main__":
main()