dth-pingpong-backend/main.py

466 lines
15 KiB
Python
Raw Normal View History

from fastapi import FastAPI, HTTPException, WebSocket
from typing import Dict, List
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from calls import *
from db import get_db_connection
2024-12-25 12:06:20 +01:00
import requests
app = FastAPI()
app.add_middleware(
CORSMiddleware,
2024-12-22 21:18:12 +01:00
allow_origins=["*"],
allow_credentials=True,
2024-12-22 21:18:12 +01:00
allow_methods=["*"],
allow_headers=["*"],
)
active_connections: Dict[int, List[WebSocket]] = {}
class RegisterRequest(BaseModel):
email: str
display_name: str
password: str
class LoginRequest(BaseModel):
email: str
password: str
class FriendRequest(BaseModel):
token: str
friend_uid: int
class getFriendList(BaseModel):
token: str
class CreateMatchRequest(BaseModel):
token: str
class JoinMatchRequest(BaseModel):
token: str
match_id: int
2024-12-22 21:18:12 +01:00
class ProfileRequest(BaseModel):
token: str
class EndMatchRequest(BaseModel):
match_id: int
player1_score: int
player2_score: int
@app.post("/register")
def register(request: RegisterRequest):
try:
uid = register_user(request.email, request.display_name, request.password)
return {"message": "User registered successfully", "uid": uid}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/login")
def login(request: LoginRequest):
sessiontoken = authenticate_user(request.email, request.password)
if sessiontoken:
return {"message": "Login successful", "uid": sessiontoken}
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.post("/add_friend")
def add_friend_endpoint(request: FriendRequest):
try:
success = add_friend(request.token, request.friend_uid)
return {"message": "Friend added successfully"} if success else HTTPException(400, "Failed to add friend")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/leaderboards")
def leaderboard():
try:
leaderboard = get_leaderboard()
return leaderboard
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
def get_friend_details(friend_uid):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name FROM users WHERE uid = %s;", (friend_uid,))
result = cursor.fetchone()
if result:
return {"uid": result["uid"], "name": result["display_name"]}
return None
finally:
cursor.close()
conn.close()
def get_uid_by_token(token):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if not result:
raise ValueError("Invalid token")
return result["uid"]
finally:
cursor.close()
conn.close()
@app.post("/get_friends")
def get_friends_list(request: getFriendList):
token = request.token
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,))
result = cursor.fetchone()
if not result or not result["friend_list"]:
raise HTTPException(status_code=404, detail="No friends found.")
friends = result["friend_list"]
friends_details = []
for key, friend_uid in friends.items():
friend_details = get_friend_details(friend_uid)
if friend_details:
friends_details.append(friend_details)
return {"friends": friends_details}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/creatematch")
def create_match(request: CreateMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
player1_uid = get_uid_by_token(request.token)
cursor.execute(
"""
INSERT INTO matches (player1_uid)
VALUES (%s)
RETURNING match_id;
""",
(player1_uid,)
)
match_id = cursor.fetchone()["match_id"]
conn.commit()
active_connections[match_id] = []
return {"match_id": match_id}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/joinmatch")
def join_match(request: JoinMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
player2_uid = get_uid_by_token(request.token)
cursor.execute(
"""
SELECT player1_uid, player2_uid
FROM matches
WHERE match_id = %s;
""",
(request.match_id,)
)
match = cursor.fetchone()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
if match["player1_uid"] == player2_uid:
raise HTTPException(status_code=503, detail="You cannot join a match you created")
if match["player2_uid"] is not None:
raise HTTPException(status_code=400, detail="Match is already full")
cursor.execute(
"""
UPDATE matches
SET player2_uid = %s
WHERE match_id = %s;
""",
(player2_uid, request.match_id)
)
conn.commit()
return {"message": "Joined match successfully", "match_id": request.match_id}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
2024-12-22 21:18:12 +01:00
def calculate_elo(player1_elo, player2_elo, player1_score, player2_score):
k_factor = 32
expected1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400))
expected2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400))
actual1 = 1 if player1_score > player2_score else 0 if player1_score < player2_score else 0.5
actual2 = 1 - actual1
elo_change1 = round(k_factor * (actual1 - expected1))
elo_change2 = round(k_factor * (actual2 - expected2))
return elo_change1, elo_change2
@app.post("/endmatch")
async def end_match(request: EndMatchRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"""
SELECT player1_uid, player2_uid
FROM matches
WHERE match_id = %s;
""",
(request.match_id,)
)
match = cursor.fetchone()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
player1_uid, player2_uid = match["player1_uid"], match["player2_uid"]
cursor.execute(
"""
SELECT uid, current_elo
FROM users
WHERE uid IN (%s, %s);
""",
(player1_uid, player2_uid)
)
players = cursor.fetchall()
player1_elo = next(p["current_elo"] for p in players if p["uid"] == player1_uid)
player2_elo = next(p["current_elo"] for p in players if p["uid"] == player2_uid)
elo_change1, elo_change2 = calculate_elo(player1_elo, player2_elo, request.player1_score, request.player2_score)
cursor.execute(
"""
UPDATE matches
SET player1_score = %s, player2_score = %s,
player1_elo_change = %s, player2_elo_change = %s
WHERE match_id = %s;
""",
(request.player1_score, request.player2_score, elo_change1, elo_change2, request.match_id)
)
new_player1_elo = max(0, player1_elo + elo_change1)
new_player2_elo = max(0, player2_elo + elo_change2)
cursor.execute(
"""
UPDATE users
SET current_elo = CASE
WHEN uid = %s THEN %s
WHEN uid = %s THEN %s
END
WHERE uid IN (%s, %s);
""",
(player1_uid, new_player1_elo, player2_uid, new_player2_elo, player1_uid, player2_uid)
)
conn.commit()
if request.match_id in active_connections:
connections = active_connections.pop(request.match_id, [])
for websocket in connections:
await websocket.close()
return {"message": "Match ended successfully"}
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.websocket("/ws/{match_id}")
async def websocket_endpoint(websocket: WebSocket, match_id: int):
await websocket.accept()
if match_id not in active_connections:
raise HTTPException(status_code=404, detail="Match not found")
active_connections[match_id].append(websocket)
try:
while True:
message = await websocket.receive_text()
data = json.loads(message)
if data["type"] == "score_update":
query = """
UPDATE matches
SET player1_score = :player1_score, player2_score = :player2_score
WHERE match_id = :match_id;
"""
await database.execute(query, {
"player1_score": data["player1_score"],
"player2_score": data["player2_score"],
"match_id": match_id
})
for connection in active_connections[match_id]:
if connection != websocket:
await connection.send_text(message)
except WebSocketDisconnect:
active_connections[match_id].remove(websocket)
if not active_connections[match_id]:
del active_connections[match_id]
2024-12-22 21:18:12 +01:00
@app.post("/getprofile")
def get_profile(request: ProfileRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE session_token = %s;", (request.token,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
cursor.execute(
"""
SELECT
m.match_id,
u.display_name AS opponent_name,
CASE
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
ELSE 'Loss'
END AS result,
CASE
WHEN m.player1_uid = %s THEN m.player1_elo_change
WHEN m.player2_uid = %s THEN m.player2_elo_change
END AS elo_change
FROM matches m
JOIN users u ON u.uid = CASE
WHEN m.player1_uid = %s THEN m.player2_uid
WHEN m.player2_uid = %s THEN m.player1_uid
END
WHERE %s IN (m.player1_uid, m.player2_uid)
ORDER BY m.match_date DESC
LIMIT 10;
""",
(uid, uid, uid, uid, uid, uid, uid)
)
matches = cursor.fetchall()
return {
"name": display_name,
"uid": uid,
"elo": current_elo,
"matches": [
{
"match_id": match["match_id"],
"opponent_name": match["opponent_name"],
"result": match["result"],
"elo_change": match["elo_change"]
}
for match in matches
]
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.post("/getprofile")
def get_profile(request: ProfileRequest):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE token = %s;", (request.token,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
cursor.execute(
"""
SELECT
m.match_id,
u.display_name AS opponent_name,
m.match_date,
CASE
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
ELSE 'Loss'
END AS result,
CASE
WHEN m.player1_uid = %s THEN m.player1_elo_change
WHEN m.player2_uid = %s THEN m.player2_elo_change
END AS elo_change
FROM matches m
JOIN users u ON u.uid = CASE
WHEN m.player1_uid = %s THEN m.player2_uid
WHEN m.player2_uid = %s THEN m.player1_uid
END
WHERE %s IN (m.player1_uid, m.player2_uid)
ORDER BY m.match_date DESC
LIMIT 10;
""",
(uid, uid, uid, uid, uid, uid, uid)
)
matches = cursor.fetchall()
return {
"name": display_name,
"uid": uid,
"elo": current_elo,
"matches": [
{
"match_id": match["match_id"],
"opponent_name": match["opponent_name"],
"result": match["result"],
"elo_change": match["elo_change"],
"match_date": match["match_date"].isoformat()
}
for match in matches
]
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
2024-12-25 12:06:20 +01:00
conn.close()
@app.get("/version")
def get_latest_commit_hashes():
try:
backend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-backend/commits")
frontend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-mobileapp/commits")
if backend_response.status_code == 200 and frontend_response.status_code == 200:
backend_hash = backend_response.json()[0]["sha"]
frontend_hash = frontend_response.json()[0]["sha"]
return {"backend": backend_hash, "frontend": frontend_hash}
return {"error": "Failed to fetch commit hashes from Forgejo"}
except Exception as e:
return {"error": str(e)}