dth-pingpong-backend/calls.py

283 lines
8.8 KiB
Python

import bcrypt
from db import get_db_connection
import base64
import secrets
import time
import json
def register_user(email, display_name, password):
hashed_password = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (email, display_name, password_hash) VALUES (%s, %s, %s) RETURNING uid;",
(email, display_name, hashed_password)
)
uid = cursor.fetchone()["uid"]
conn.commit()
return uid
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def authenticate_user(email, password):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, password_hash FROM users WHERE email = %s;", (email,))
user = cursor.fetchone()
if user and bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
epoch_timestamp = int(time.time())
random_int = secrets.randbelow(1000000)
token_data = f"{email}:{password}:{epoch_timestamp}:{random_int}"
encoded_token = base64.b64encode(token_data.encode()).decode()
cursor.execute("UPDATE users SET session_token = %s WHERE email = %s;", (encoded_token, email))
conn.commit()
return encoded_token
return None
finally:
cursor.close()
conn.close()
def reauth_user(token):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT email, display_name, current_elo, session_token FROM users WHERE session_token = %s;", (token,))
user = cursor.fetchone()
if user:
user_data = {
"email": user["email"],
"display_name": user["display_name"],
"elo": user["current_elo"]
}
return user_data
return None
finally:
cursor.close()
conn.close()
def add_friend(token, friend_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if result:
friends = result["friend_list"] or {}
else:
friends = {}
index = len(friends)
friend_key = f"friend{index}"
if friend_key not in friends:
friends[friend_key] = friend_uid
friends_json = json.dumps(friends)
cursor.execute(
"UPDATE users SET friend_list = %s WHERE session_token = %s;",
(friends_json, token)
)
conn.commit()
else:
return False
return True
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def get_leaderboard():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT display_name, current_elo, uid FROM users WHERE current_elo IS NOT NULL ORDER BY current_elo DESC;")
players = cursor.fetchall()
player_elo_list = [{"player_name": player["display_name"], "elo_rating": player["current_elo"], "friend_code": player["uid"]} for player in players]
return player_elo_list
finally:
cursor.close()
conn.close()
def send_match_invite(sender_uid, receiver_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO matches (player1_uid, player2_uid) VALUES (%s, %s) RETURNING match_id;",
(sender_uid, receiver_uid)
)
match_id = cursor.fetchone()["match_id"]
conn.commit()
return match_id
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def accept_match_invite(match_id, player2_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE matches SET match_date = CURRENT_TIMESTAMP WHERE match_id = %s AND player2_uid = %s;",
(match_id, player2_uid)
)
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def get_all_matches():
conn = get_db_connection()
cursor = conn.cursor()
try:
query = """
SELECT
m.match_id,
u1.display_name AS player1_name,
u2.display_name AS player2_name,
m.player1_score,
m.player2_score,
CASE
WHEN m.player1_score IS NULL AND m.player2_score IS NULL THEN 'upcoming'
WHEN m.player1_score IS NOT NULL AND m.player2_score IS NOT NULL THEN 'completed'
ELSE 'ongoing'
END AS status,
m.match_date
FROM matches m
LEFT JOIN users u1 ON m.player1_uid = u1.uid
LEFT JOIN users u2 ON m.player2_uid = u2.uid
ORDER BY
CASE
WHEN m.player1_score IS NULL AND m.player2_score IS NULL THEN 2 -- Upcoming
WHEN m.player1_score IS NOT NULL AND m.player2_score IS NOT NULL THEN 3 -- Completed
ELSE 1 -- Ongoing
END,
m.match_date;
"""
cursor.execute(query)
matches = cursor.fetchall()
return matches
finally:
cursor.close()
conn.close()
def get_elo(auth_header):
if not auth_header.startswith('Basic '):
raise ValueError("Invalid Authorization header")
encoded_credentials = auth_header.split(' ', 1)[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode()
try:
email, password = decoded_credentials.split(':', 1)
except ValueError:
raise ValueError("Invalid credentials format")
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT email, display_name, password_hash, elo FROM users WHERE email = %s;", (email,))
user = cursor.fetchone()
if not user:
raise ValueError("User not found")
if not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
raise ValueError("Invalid password")
return {
"email": user["email"],
"elo": user["elo"],
"display_name": user["display_name"]
}
except Exception as e:
raise e
finally:
cursor.close()
conn.close()
def update_elo(player1_display_name, player2_display_name, player1_score, player2_score, winner):
conn = get_db_connection()
cursor = conn.cursor()
K = 32
try:
cursor.execute("""
SELECT display_name, current_elo
FROM users
WHERE display_name IN (%s, %s);
""", (player1_display_name, player2_display_name))
players = cursor.fetchall()
if len(players) != 2:
return {"error": "Both players must exist in the database."}
player1_elo = players[0]["current_elo"]
player2_elo = players[1]["current_elo"]
expected_player1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400))
expected_player2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400))
if winner == player1_display_name:
actual_player1 = 1
actual_player2 = 0
elif winner == player2_display_name:
actual_player1 = 0
actual_player2 = 1
else:
actual_player1 = 0.5
actual_player2 = 0.5
new_player1_elo = player1_elo + K * (actual_player1 - expected_player1)
new_player2_elo = player2_elo + K * (actual_player2 - expected_player2)
cursor.execute("""
UPDATE users
SET current_elo = %s
WHERE display_name = %s;
""", (new_player1_elo, player1_display_name))
cursor.execute("""
UPDATE users
SET current_elo = %s
WHERE display_name = %s;
""", (new_player2_elo, player2_display_name))
conn.commit()
return {
"player1_display_name": player1_display_name,
"new_player1_elo": new_player1_elo,
"player2_display_name": player2_display_name,
"new_player2_elo": new_player2_elo
}
finally:
cursor.close()
conn.close()