from fastapi import FastAPI, HTTPException from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from calls import * from db import get_db_connection import requests from openskill.models import PlackettLuce model = PlackettLuce() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class RegisterRequest(BaseModel): email: str display_name: str password: str class JoinMatch2v2Request(BaseModel): token: str match_id: int slot: int class LoginRequest(BaseModel): email: str password: str class ResetPasswordRequest(BaseModel): uid: int email: str new_password: str class FriendRequest(BaseModel): token: str friend_uid: int class getFriendList(BaseModel): token: str class CreateMatchRequest(BaseModel): token: str class JoinMatchRequest(BaseModel): token: str match_id: int class ProfileRequest(BaseModel): token: str class EndMatchRequest(BaseModel): match_id: int player1_score: int player2_score: int class EndFourMatch(BaseModel): match_id: int player1_team1_score: int player2_team1_score: int player1_team2_score: int player2_team2_score: int @app.post("/register") def register(request: RegisterRequest): try: uid = register_user(request.email, request.display_name, request.password) return {"message": "User registered successfully", "uid": uid} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/login") def login(request: LoginRequest): sessiontoken = authenticate_user(request.email, request.password) if sessiontoken: return {"message": "Login successful", "uid": sessiontoken} else: raise HTTPException(status_code=401, detail="Invalid credentials") @app.post("/reset-password") def reset_user_password(request: ResetPasswordRequest): try: reset_password(request.uid, request.email, request.new_password) return {"message": "Password reset successfully"} except ValueError as ve: raise HTTPException(status_code=404, detail=str(ve)) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/add_friend") def add_friend_endpoint(request: FriendRequest): try: success = add_friend(request.token, request.friend_uid) return {"message": "Friend added successfully"} if success else HTTPException(400, "Failed to add friend") except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/leaderboards") def leaderboard(): try: leaderboard = get_leaderboard() return leaderboard except Exception as e: raise HTTPException(status_code=400, detail=str(e)) def get_friend_details(friend_uid): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT uid, display_name FROM users WHERE uid = %s;", (friend_uid,)) result = cursor.fetchone() if result: return {"uid": result["uid"], "name": result["display_name"]} return None finally: cursor.close() conn.close() def get_uid_by_token(token): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT uid FROM users WHERE session_token = %s;", (token,)) result = cursor.fetchone() if not result: raise ValueError("Invalid token") return result["uid"] finally: cursor.close() conn.close() @app.post("/get_friends") def get_friends_list(request: getFriendList): token = request.token conn = get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,)) result = cursor.fetchone() if not result or not result["friend_list"]: raise HTTPException(status_code=404, detail="No friends found.") friends = result["friend_list"] friends_details = [] for key, friend_uid in friends.items(): friend_details = get_friend_details(friend_uid) if friend_details: friends_details.append(friend_details) return {"friends": friends_details} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.post("/creatematch") def create_match(request: CreateMatchRequest): conn = get_db_connection() cursor = conn.cursor() try: player1_uid = get_uid_by_token(request.token) cursor.execute( """ INSERT INTO matches (player1_uid) VALUES (%s) RETURNING match_id; """, (player1_uid,) ) match_id = cursor.fetchone()["match_id"] conn.commit() return {"match_id": match_id} except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.post("/creatematch_2v2") def create_match_2v2(request: CreateMatchRequest): conn = get_db_connection() cursor = conn.cursor() try: player1_uid = get_uid_by_token(request.token) cursor.execute( """ INSERT INTO matches_2v2 (player1_team1_uid) VALUES (%s) RETURNING match_id; """, (player1_uid,) ) match_id = cursor.fetchone()["match_id"] conn.commit() return {"match_id": match_id} except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.post("/joinmatch") def join_match(request: JoinMatchRequest): conn = get_db_connection() cursor = conn.cursor() try: player2_uid = get_uid_by_token(request.token) cursor.execute( """ SELECT player1_uid, player2_uid FROM matches WHERE match_id = %s; """, (request.match_id,) ) match = cursor.fetchone() if not match: raise HTTPException(status_code=404, detail="Match not found") if match["player1_uid"] == player2_uid: raise HTTPException(status_code=503, detail="You cannot join a match you created") if match["player2_uid"] is not None: raise HTTPException(status_code=400, detail="Match is already full") cursor.execute( """ UPDATE matches SET player2_uid = %s WHERE match_id = %s; """, (player2_uid, request.match_id) ) conn.commit() cursor.execute( """ SELECT m.match_id, m.player1_uid, m.player2_uid, u1.display_name AS player1_name, u2.display_name AS player2_name FROM matches m LEFT JOIN users u1 ON m.player1_uid = u1.uid LEFT JOIN users u2 ON m.player2_uid = u2.uid WHERE m.match_id = %s; """, (request.match_id,) ) updated_match = cursor.fetchone() return { "message": "Joined match successfully", "match_id": updated_match["match_id"], "player1_uid": updated_match["player1_uid"], "player2_uid": updated_match["player2_uid"], "player1_name": updated_match["player1_name"], "player2_name": updated_match["player2_name"], } except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() def calculate_elo(player1_elo, player2_elo, player1_score, player2_score): k_factor = 32 expected1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400)) expected2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400)) actual1 = 1 if player1_score > player2_score else 0 if player1_score < player2_score else 0.5 actual2 = 1 - actual1 elo_change1 = round(k_factor * (actual1 - expected1)) elo_change2 = round(k_factor * (actual2 - expected2)) return elo_change1, elo_change2 @app.post("/endmatch") def end_match(request: EndMatchRequest): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( """ SELECT player1_uid, player2_uid FROM matches WHERE match_id = %s; """, (request.match_id,) ) match = cursor.fetchone() if not match: raise HTTPException(status_code=404, detail="Match not found") player1_uid, player2_uid = match["player1_uid"], match["player2_uid"] cursor.execute( """ SELECT uid, current_elo FROM users WHERE uid IN (%s, %s); """, (player1_uid, player2_uid) ) players = cursor.fetchall() player1_elo = next(p["current_elo"] for p in players if p["uid"] == player1_uid) player2_elo = next(p["current_elo"] for p in players if p["uid"] == player2_uid) elo_change1, elo_change2 = calculate_elo( player1_elo, player2_elo, request.player1_score, request.player2_score ) cursor.execute( """ UPDATE matches SET player1_score = %s, player2_score = %s, player1_elo_change = %s, player2_elo_change = %s WHERE match_id = %s; """, (request.player1_score, request.player2_score, elo_change1, elo_change2, request.match_id) ) new_player1_elo = player1_elo + elo_change1 new_player2_elo = player2_elo + elo_change2 cursor.execute( """ UPDATE users SET current_elo = CASE WHEN uid = %s THEN %s WHEN uid = %s THEN %s END WHERE uid IN (%s, %s); """, (player1_uid, new_player1_elo, player2_uid, new_player2_elo, player1_uid, player2_uid) ) conn.commit() return {"message": "Match ended successfully"} except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.post("/getprofile") def get_profile(request: ProfileRequest): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT uid, display_name, current_elo, openskill_mu, openskill_sigma FROM users WHERE session_token = %s;", (request.token,)) user = cursor.fetchone() if not user: raise HTTPException(status_code=404, detail="User not found") uid, display_name, current_elo, oskmu, osksig = user["uid"], user["display_name"], user["current_elo"], user["openskill_mu"], user["openskill_sigma"] cursor.execute( """ SELECT m.match_id, u.display_name AS opponent_name, CASE WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win' WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win' ELSE 'Loss' END AS result, CASE WHEN m.player1_uid = %s THEN m.player1_elo_change WHEN m.player2_uid = %s THEN m.player2_elo_change END AS elo_change FROM matches m JOIN users u ON u.uid = CASE WHEN m.player1_uid = %s THEN m.player2_uid WHEN m.player2_uid = %s THEN m.player1_uid END WHERE %s IN (m.player1_uid, m.player2_uid) ORDER BY m.match_date DESC LIMIT 10; """, (uid, uid, uid, uid, uid, uid, uid) ) matches = cursor.fetchall() return { "name": display_name, "uid": uid, "elo": current_elo, "osk_mu": oskmu, "osk_sig": osksig, "matches": [ { "match_id": match["match_id"], "opponent_name": match["opponent_name"], "result": match["result"], "elo_change": match["elo_change"] } for match in matches ] } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.get("/version") def get_latest_commit_hashes(): try: backend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-backend/commits") frontend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-mobileapp/commits") if backend_response.status_code == 200 and frontend_response.status_code == 200: backend_hash = backend_response.json()[0]["sha"] frontend_hash = frontend_response.json()[0]["sha"] return {"backend": backend_hash, "frontend": frontend_hash} return {"error": "Failed to fetch commit hashes from Forgejo"} except Exception as e: return {"error": str(e)} @app.post("/joinmatch_2v2") def join_match_2v2(request: JoinMatch2v2Request): conn = get_db_connection() cursor = conn.cursor() # Use RealDictCursor try: player_uid = get_uid_by_token(request.token) valid_slots = { 2: "player2_team1_uid", 3: "player1_team2_uid", 4: "player2_team2_uid", } if request.slot not in valid_slots: raise HTTPException(status_code=400, detail=f"Invalid slot: {request.slot}") column_name = valid_slots[request.slot] # Fetch current match data cursor.execute( """ SELECT player1_team1_uid, player2_team1_uid, player1_team2_uid, player2_team2_uid FROM matches_2v2 WHERE match_id = %s; """, (request.match_id,) ) match = cursor.fetchone() # RealDictCursor returns a dictionary if not match: raise HTTPException(status_code=404, detail="Match not found") # Access player slots using dictionary keys player1_team1_uid = match['player1_team1_uid'] player2_team1_uid = match['player2_team1_uid'] player1_team2_uid = match['player1_team2_uid'] player2_team2_uid = match['player2_team2_uid'] # Check if the slot is already occupied if match[column_name] is not None: raise HTTPException(status_code=400, detail=f"Slot '{request.slot}' is already occupied") # Update the match with the new player's UID cursor.execute( f""" UPDATE matches_2v2 SET {column_name} = %s WHERE match_id = %s; """, (player_uid, request.match_id) ) conn.commit() # Build the list of player UIDs player_uids = [ uid for uid in [player1_team1_uid, player2_team1_uid, player1_team2_uid, player2_team2_uid] if uid is not None ] + [player_uid] # Fix the WHERE IN clause for PostgreSQL placeholders = ', '.join(['%s'] * len(player_uids)) query = f""" SELECT uid, display_name FROM users WHERE uid IN ({placeholders}); """ cursor.execute(query, player_uids) user_data = cursor.fetchall() user_map = {user['uid']: user['display_name'] for user in user_data} # Check if all other slots are filled can_edit = all( match[slot] is not None for slot in valid_slots.values() if slot != column_name ) return { "message": f"Player successfully joined as {request.slot}", "match_id": request.match_id, "slot": request.slot, "players": [ { "uid": uid, "name": user_map.get(uid, "Unknown") } for uid in player_uids ], "canEdit": can_edit } except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.post("/endfour") async def end_four_match_openskill(request: EndFourMatch): conn = get_db_connection() cursor = conn.cursor() try: # Fetch player IDs and their current ratings cursor.execute( """ SELECT player1_team1_uid, player2_team1_uid, player1_team2_uid, player2_team2_uid FROM matches_2v2 WHERE match_id = %s; """, (request.match_id,) ) match = cursor.fetchone() print(match) if not match: raise HTTPException(status_code=404, detail="Match not found") player1_team1_uid = match['player1_team1_uid'] player2_team1_uid = match['player2_team1_uid'] player1_team2_uid = match['player1_team2_uid'] player2_team2_uid = match['player2_team2_uid'] cursor.execute( """ SELECT uid, openskill_mu AS mu, openskill_sigma AS sigma FROM users WHERE uid IN (%s, %s, %s, %s); """, (player1_team1_uid, player2_team1_uid, player1_team2_uid, player2_team2_uid) ) players = { row["uid"]: model.rating(mu=row["mu"], sigma=row["sigma"]) for row in cursor.fetchall() } # Default ratings for any missing players default_rating = model.rating(mu=25, sigma=8.333) players = { player1_team1_uid: players.get(player1_team1_uid, default_rating), player2_team1_uid: players.get(player2_team1_uid, default_rating), player1_team2_uid: players.get(player1_team2_uid, default_rating), player2_team2_uid: players.get(player2_team2_uid, default_rating), } # Calculate team scores team1_score = request.player1_team1_score + request.player2_team1_score team2_score = request.player1_team2_score + request.player2_team2_score # Assign ranks based on scores ranks = [0, 1] if team1_score > team2_score else [1, 0] # Update ratings using OpenSkill updated_ratings = model.rate( [[players[player1_team1_uid], players[player2_team1_uid]], [players[player1_team2_uid], players[player2_team2_uid]]], ranks=ranks ) # Flatten the results for database updates updates = [ (uid, rating.mu, rating.sigma) for uid, rating in zip( [player1_team1_uid, player2_team1_uid, player1_team2_uid, player2_team2_uid], [updated_ratings[0][0], updated_ratings[0][1], updated_ratings[1][0], updated_ratings[1][1]] ) ] # Update user ratings in the database for uid, new_mu, new_sigma in updates: cursor.execute( """ UPDATE users SET openskill_mu = %s, openskill_sigma = %s WHERE uid = %s; """, (new_mu, new_sigma, uid) ) # Update match results cursor.execute( """ UPDATE matches_2v2 SET player1_team1_score = %s, player2_team1_score = %s, player1_team2_score = %s, player2_team2_score = %s, team1_score = %s, team2_score = %s, winner_team = %s WHERE match_id = %s; """, (request.player1_team1_score, request.player2_team1_score, request.player1_team2_score, request.player2_team2_score, team1_score, team2_score, 1 if team1_score > team2_score else 2, # Determine the winner team request.match_id) ) conn.commit() return {"message": "Match ended and OpenSkill ratings updated successfully"} except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close()