dth-pingpong-backend/main.py

396 lines
12 KiB
Python

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from calls import *
from db import get_db_connection
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class RegisterRequest(BaseModel):
email: str
display_name: str
password: str
class LoginRequest(BaseModel):
email: str
password: str
class FriendRequest(BaseModel):
token: str
friend_uid: int
class getFriendList(BaseModel):
token: str
class CreateMatchRequest(BaseModel):
token: str
class JoinMatchRequest(BaseModel):
token: str
match_id: int
class ProfileRequest(BaseModel):
token: str
class EndMatchRequest(BaseModel):
match_id: int
player1_score: int
player2_score: int
@app.post("/register")
def register(request: RegisterRequest):
try:
uid = register_user(request.email, request.display_name, request.password)
return {"message": "User registered successfully", "uid": uid}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/login")
def login(request: LoginRequest):
sessiontoken = authenticate_user(request.email, request.password)
if sessiontoken:
return {"message": "Login successful", "uid": sessiontoken}
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.post("/add_friend")
def add_friend_endpoint(request: FriendRequest):
try:
success = add_friend(request.token, request.friend_uid)
return {"message": "Friend added successfully"} if success else HTTPException(400, "Failed to add friend")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/leaderboards")
def leaderboard():
try:
leaderboard = get_leaderboard()
return leaderboard
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
def get_friend_details(friend_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name FROM users WHERE uid = %s;", (friend_uid,))
result = cursor.fetchone()
if result:
return {"uid": result["uid"], "name": result["display_name"]}
return None
finally:
cursor.close()
conn.close()
def get_uid_by_token(token):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if not result:
raise ValueError("Invalid token")
return result["uid"]
finally:
cursor.close()
conn.close()
@app.post("/get_friends")
def get_friends_list(request: getFriendList):
token = request.token
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if not result or not result["friend_list"]:
raise HTTPException(status_code=404, detail="No friends found.")
friends = result["friend_list"]
friends_details = []
for key, friend_uid in friends.items():
friend_details = get_friend_details(friend_uid)
if friend_details:
friends_details.append(friend_details)
return {"friends": friends_details}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/creatematch")
def create_match(request: CreateMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
player1_uid = get_uid_by_token(request.token)
cursor.execute(
"""
INSERT INTO matches (player1_uid)
VALUES (%s)
RETURNING match_id;
""",
(player1_uid,)
)
match_id = cursor.fetchone()["match_id"]
conn.commit()
return {"match_id": match_id}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/joinmatch")
def join_match(request: JoinMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
player2_uid = get_uid_by_token(request.token)
cursor.execute(
"""
SELECT player1_uid, player2_uid
FROM matches
WHERE match_id = %s;
""",
(request.match_id,)
)
match = cursor.fetchone()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
if match["player1_uid"] == player2_uid:
raise HTTPException(status_code=503, detail="You cannot join a match you created")
if match["player2_uid"] is not None:
raise HTTPException(status_code=400, detail="Match is already full")
cursor.execute(
"""
UPDATE matches
SET player2_uid = %s
WHERE match_id = %s;
""",
(player2_uid, request.match_id)
)
conn.commit()
return {"message": "Joined match successfully", "match_id": request.match_id}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
def calculate_elo(player1_elo, player2_elo, player1_score, player2_score):
k_factor = 32
expected1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400))
expected2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400))
actual1 = 1 if player1_score > player2_score else 0 if player1_score < player2_score else 0.5
actual2 = 1 - actual1
elo_change1 = round(k_factor * (actual1 - expected1))
elo_change2 = round(k_factor * (actual2 - expected2))
return elo_change1, elo_change2
@app.post("/endmatch")
def end_match(request: EndMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"""
SELECT player1_uid, player2_uid
FROM matches
WHERE match_id = %s;
""",
(request.match_id,)
)
match = cursor.fetchone()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
player1_uid, player2_uid = match["player1_uid"], match["player2_uid"]
cursor.execute(
"""
SELECT uid, current_elo
FROM users
WHERE uid IN (%s, %s);
""",
(player1_uid, player2_uid)
)
players = cursor.fetchall()
player1_elo = next(p["current_elo"] for p in players if p["uid"] == player1_uid)
player2_elo = next(p["current_elo"] for p in players if p["uid"] == player2_uid)
elo_change1, elo_change2 = calculate_elo(
player1_elo, player2_elo, request.player1_score, request.player2_score
)
cursor.execute(
"""
UPDATE matches
SET player1_score = %s, player2_score = %s,
player1_elo_change = %s, player2_elo_change = %s
WHERE match_id = %s;
""",
(request.player1_score, request.player2_score, elo_change1, elo_change2, request.match_id)
)
new_player1_elo = player1_elo + elo_change1
new_player2_elo = player2_elo + elo_change2
cursor.execute(
"""
UPDATE users
SET current_elo = CASE WHEN uid = %s THEN %s WHEN uid = %s THEN %s END
WHERE uid IN (%s, %s);
""",
(player1_uid, new_player1_elo, player2_uid, new_player2_elo, player1_uid, player2_uid)
)
conn.commit()
return {"message": "Match ended successfully"}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/getprofile")
def get_profile(request: ProfileRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE session_token = %s;", (request.token,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
cursor.execute(
"""
SELECT
m.match_id,
u.display_name AS opponent_name,
CASE
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
ELSE 'Loss'
END AS result,
CASE
WHEN m.player1_uid = %s THEN m.player1_elo_change
WHEN m.player2_uid = %s THEN m.player2_elo_change
END AS elo_change
FROM matches m
JOIN users u ON u.uid = CASE
WHEN m.player1_uid = %s THEN m.player2_uid
WHEN m.player2_uid = %s THEN m.player1_uid
END
WHERE %s IN (m.player1_uid, m.player2_uid)
ORDER BY m.match_date DESC
LIMIT 10;
""",
(uid, uid, uid, uid, uid, uid, uid)
)
matches = cursor.fetchall()
return {
"name": display_name,
"uid": uid,
"elo": current_elo,
"matches": [
{
"match_id": match["match_id"],
"opponent_name": match["opponent_name"],
"result": match["result"],
"elo_change": match["elo_change"]
}
for match in matches
]
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/getprofile")
def get_profile(request: ProfileRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE token = %s;", (request.token,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
cursor.execute(
"""
SELECT
m.match_id,
u.display_name AS opponent_name,
m.match_date,
CASE
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
ELSE 'Loss'
END AS result,
CASE
WHEN m.player1_uid = %s THEN m.player1_elo_change
WHEN m.player2_uid = %s THEN m.player2_elo_change
END AS elo_change
FROM matches m
JOIN users u ON u.uid = CASE
WHEN m.player1_uid = %s THEN m.player2_uid
WHEN m.player2_uid = %s THEN m.player1_uid
END
WHERE %s IN (m.player1_uid, m.player2_uid)
ORDER BY m.match_date DESC
LIMIT 10;
""",
(uid, uid, uid, uid, uid, uid, uid)
)
matches = cursor.fetchall()
return {
"name": display_name,
"uid": uid,
"elo": current_elo,
"matches": [
{
"match_id": match["match_id"],
"opponent_name": match["opponent_name"],
"result": match["result"],
"elo_change": match["elo_change"],
"match_date": match["match_date"].isoformat()
}
for match in matches
]
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()