Skip to content

Commit

Permalink
Argon2id hashed passwords in userdb
Browse files Browse the repository at this point in the history
  • Loading branch information
Onur Zungur committed Jul 3, 2024
1 parent 688e9ce commit ab2608a
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 62 deletions.
114 changes: 62 additions & 52 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):
self.run_lock = threading.Lock()
self.active_runs = {}

# Keep some data about the workers
self.worker_runs = {}
self.worker_runs_lock = threading.Lock()

self.request_task_lock = threading.Lock()
self.scheduler = None

Expand All @@ -122,38 +126,44 @@ def validate_data_structures(self):
flush=True,
)
try:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
validate(
wtt_map_schema,
self.wtt_map,
name="wtt_map",
subs={"runs_schema": dict},
)
validate(
connections_counter_schema,
self.connections_counter,
name="connections_counter",
)
validate(
unfinished_runs_schema,
self.unfinished_runs,
name="unfinished_runs",
)
validate(
active_runs_schema,
self.active_runs,
name="active_runs",
)
validate(
worker_runs_schema,
self.worker_runs,
name="worker_runs",
)
with self.run_cache_lock:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
with self.wtt_lock:
validate(
wtt_map_schema,
self.wtt_map,
name="wtt_map",
subs={"runs_schema": dict},
)
with self.connections_lock:
validate(
connections_counter_schema,
self.connections_counter,
name="connections_counter",
)
with self.unfinished_runs_lock:
validate(
unfinished_runs_schema,
self.unfinished_runs,
name="unfinished_runs",
)
with self.run_lock:
validate(
active_runs_schema,
self.active_runs,
name="active_runs",
)
with self.worker_runs_lock:
validate(
worker_runs_schema,
self.worker_runs,
name="worker_runs",
)
except ValidationError as e:
message = f"Validation of internal data structures failed: {str(e)}"
print(message, flush=True)
Expand Down Expand Up @@ -565,10 +575,10 @@ def new_run(
raise Exception(message)

# We cannot use self.buffer since new_run does not have an id yet.
run_id = self.runs.insert_one(new_run).inserted_id
run_id = str(self.runs.insert_one(new_run).inserted_id)

with self.unfinished_runs_lock:
self.unfinished_runs.add(str(run_id))
self.unfinished_runs.add(run_id)

return run_id

Expand Down Expand Up @@ -659,7 +669,6 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):
# Cache runs
run_cache = {}
run_cache_lock = threading.Lock()
run_cache_write_lock = threading.Lock()

# handle termination
def exit_run(self, signum, frame):
Expand Down Expand Up @@ -702,17 +711,15 @@ def buffer(self, run, flush):
flush=True,
)
return
r_id = str(run["_id"])
with self.run_cache_lock:
r_id = str(run["_id"])
if flush:
self.run_cache[r_id] = {
"is_changed": False,
"last_access_time": time.time(),
"last_sync_time": time.time(),
"run": run,
}
with self.run_cache_write_lock:
self.runs.replace_one({"_id": ObjectId(r_id)}, run)
else:
if r_id in self.run_cache:
last_sync_time = self.run_cache[r_id]["last_sync_time"]
Expand All @@ -724,6 +731,9 @@ def buffer(self, run, flush):
"last_sync_time": last_sync_time,
"run": run,
}
if flush:
with self.active_run_lock(r_id):
self.runs.replace_one({"_id": ObjectId(r_id)}, run)

def stop(self):
self.flush_all()
Expand Down Expand Up @@ -754,10 +764,13 @@ def flush_buffers(self):
oldest_entry = cache_entry
if oldest_entry is not None:
oldest_run = oldest_entry["run"]
oldest_run_id = oldest_run["_id"]
oldest_entry["is_changed"] = False
oldest_entry["last_sync_time"] = time.time()
with self.run_cache_write_lock:
self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run)

if oldest_entry is not None:
with self.active_run_lock(str(oldest_run_id)):
self.runs.replace_one({"_id": oldest_run_id}, oldest_run)

def clean_cache(self):
now = time.time()
Expand Down Expand Up @@ -806,11 +819,10 @@ def scavenge_dead_tasks(self):
self.buffer(run, False)

def get_unfinished_runs_id(self):
with self.run_cache_write_lock:
unfinished_runs = self.runs.find(
{"finished": False}, {"_id": 1}, sort=[("last_updated", DESCENDING)]
)
return unfinished_runs
unfinished_runs = self.runs.find(
{"finished": False}, {"_id": 1}, sort=[("last_updated", DESCENDING)]
)
return unfinished_runs

def get_unfinished_runs(self, username=None):
# Note: the result can be only used once.
Expand Down Expand Up @@ -971,8 +983,6 @@ def calc_itp(self, run, count):

task_semaphore = threading.Semaphore(2)

worker_runs = {}

def worker_cap(self, run, worker_info):
# Estimate how many games a worker will be able to run
# during the time interval determined by "self.task_duration".
Expand Down Expand Up @@ -1256,11 +1266,11 @@ def priority(run): # lower is better
# Cache some data. Currently we record the id's
# the worker has seen, as well as the last id that was seen.
# Note that "worker_runs" is empty after a server restart.

if unique_key not in self.worker_runs:
self.worker_runs[unique_key] = {}
self.worker_runs[unique_key][run_id] = True
self.worker_runs[unique_key]["last_run"] = run_id
with self.worker_runs_lock:
if unique_key not in self.worker_runs:
self.worker_runs[unique_key] = {}
self.worker_runs[unique_key][run_id] = True
self.worker_runs[unique_key]["last_run"] = run_id

return {"run": run, "task_id": task_id}

Expand Down
54 changes: 48 additions & 6 deletions server/fishtest/userdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
import threading
import time
from datetime import datetime, timezone

from functools import lru_cache

from argon2 import PasswordHasher
from argon2.exceptions import (
HashingError,
InvalidHash,
VerificationError,
VerifyMismatchError,
)
from fishtest.schemas import user_schema
from pymongo import ASCENDING
from vtjson import ValidationError, validate
Expand Down Expand Up @@ -47,18 +55,52 @@ def clear_cache(self):
with self.user_lock:
self.cache.clear()

def hash_password(
self,
password,
time_cost: int = 3,
memory_cost: int = 12288,
parallelism: int = 1,
):
return PasswordHasher(time_cost, memory_cost, parallelism).hash(password)

@lru_cache(maxsize=128)
def check_password(self, hashed_password, password):
try:
return PasswordHasher().verify(hashed_password, password)
except InvalidHash as e:
print("InvalidHash:", e, sep="\n")
except VerifyMismatchError as e:
print("VerifyMismatchError:", e, sep="\n")
except HashingError as e:
print("HashingError:", e, sep="\n")
except VerificationError as e:
print("VerificationError:", e, sep="\n")
except Exception as e:
print("Exception:", e, sep="\n")
return False

def authenticate(self, username, password):
user = self.get_user(username)
if not user or user["password"] != password:
sys.stderr.write("Invalid login: '{}' '{}'\n".format(username, password))
return {"error": "Invalid password for user: {}".format(username)}
if not user:
sys.stderr.write("Invalid username: '{}'\n".format(username))
return {"error": "Invalid username: {}".format(username)}
if user["password"] != password:
sys.stderr.write("Invalid login (plaintext): '{}'\n".format(username))
if not self.check_password(user["password"], password):
sys.stderr.write("Invalid login (hashed): '{}'\n".format(username))
return {"error": "Invalid password for user: {}".format(username)}
if "blocked" in user and user["blocked"]:
sys.stderr.write("Blocked account: '{}' '{}'\n".format(username, password))
sys.stderr.write("Blocked account: '{}'\n".format(username))
return {"error": "Account blocked for user: {}".format(username)}
if "pending" in user and user["pending"]:
sys.stderr.write("Pending account: '{}' '{}'\n".format(username, password))
sys.stderr.write("Pending account: '{}'\n".format(username))
return {"error": "Account pending for user: {}".format(username)}

# temp: remove after all the passwords in userdb are hashed
if user["password"] == password:
user["password"] = self.hash_password(user["password"])
self.save_user(user)
return {"username": username, "authenticated": True}

def get_users(self):
Expand Down
6 changes: 4 additions & 2 deletions server/fishtest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def signup(request):

result = request.userdb.create_user(
username=signup_username,
password=signup_password,
password=request.userdb.hash_password(signup_password),
email=validated_email,
tests_repo=tests_repo,
)
Expand Down Expand Up @@ -635,7 +635,9 @@ def user(request):
(new_email if len(new_email) > 0 else None),
)
if strong_password:
user_data["password"] = new_password
user_data["password"] = request.userdb.hash_password(
new_password
)
request.session.flash("Success! Password updated")
else:
request.session.flash(
Expand Down
1 change: 1 addition & 0 deletions server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"awscli",
"zxcvbn",
"email_validator",
"argon2-cffi",
"vtjson",
]

Expand Down
Loading

0 comments on commit ab2608a

Please sign in to comment.