dth-pingpong-backend/calls.py

158 lines
5 KiB
Python

import bcrypt
from db import get_db_connection
import base64
import secrets
import time
import json
# USER MANAGEMENT FUNCTIONS
def register_user(email, display_name, password):
hashed_password = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (email, display_name, password_hash) VALUES (%s, %s, %s) RETURNING uid;",
(email, display_name, hashed_password)
)
uid = cursor.fetchone()["uid"]
conn.commit()
return uid
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def authenticate_user(email, password):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, password_hash FROM users WHERE email = %s;", (email,))
user = cursor.fetchone()
if user and bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
epoch_timestamp = int(time.time())
random_int = secrets.randbelow(1000000)
token_data = f"{email}:{password}:{epoch_timestamp}:{random_int}"
encoded_token = base64.b64encode(token_data.encode()).decode()
cursor.execute("UPDATE users SET session_token = %s WHERE email = %s;", (encoded_token, email))
conn.commit()
return encoded_token
return None
finally:
cursor.close()
conn.close()
def add_friend(token, friend_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if result:
friends = result["friend_list"] or {}
else:
friends = {}
index = len(friends)
friend_key = f"friend{index}"
if friend_key not in friends:
friends[friend_key] = friend_uid
friends_json = json.dumps(friends)
cursor.execute(
"UPDATE users SET friend_list = %s WHERE session_token = %s;",
(friends_json, token)
)
conn.commit()
else:
return False
return True
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
# MATCHMAKING ENDPOINTS
def get_leaderboard():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT display_name, current_elo, uid FROM users WHERE current_elo IS NOT NULL ORDER BY current_elo DESC;")
players = cursor.fetchall()
player_elo_list = [{"player_name": player["display_name"], "elo_rating": player["current_elo"], "friend_code": player["uid"]} for player in players]
return player_elo_list
finally:
cursor.close()
conn.close()
def update_elo(player1_display_name, player2_display_name, player1_score, player2_score, winner):
conn = get_db_connection()
cursor = conn.cursor()
K = 32
try:
cursor.execute("""
SELECT display_name, current_elo
FROM users
WHERE display_name IN (%s, %s);
""", (player1_display_name, player2_display_name))
players = cursor.fetchall()
if len(players) != 2:
return {"error": "Both players must exist in the database."}
player1_elo = players[0]["current_elo"]
player2_elo = players[1]["current_elo"]
expected_player1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400))
expected_player2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400))
if winner == player1_display_name:
actual_player1 = 1
actual_player2 = 0
elif winner == player2_display_name:
actual_player1 = 0
actual_player2 = 1
else:
actual_player1 = 0.5
actual_player2 = 0.5
new_player1_elo = player1_elo + K * (actual_player1 - expected_player1)
new_player2_elo = player2_elo + K * (actual_player2 - expected_player2)
cursor.execute("""
UPDATE users
SET current_elo = %s
WHERE display_name = %s;
""", (new_player1_elo, player1_display_name))
cursor.execute("""
UPDATE users
SET current_elo = %s
WHERE display_name = %s;
""", (new_player2_elo, player2_display_name))
conn.commit()
return {
"player1_display_name": player1_display_name,
"new_player1_elo": new_player1_elo,
"player2_display_name": player2_display_name,
"new_player2_elo": new_player2_elo
}
finally:
cursor.close()
conn.close()