415 lines
13 KiB
Python
415 lines
13 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from calls import *
|
|
from db import get_db_connection
|
|
import requests
|
|
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class RegisterRequest(BaseModel):
|
|
email: str
|
|
display_name: str
|
|
password: str
|
|
|
|
class LoginRequest(BaseModel):
|
|
email: str
|
|
password: str
|
|
|
|
|
|
class FriendRequest(BaseModel):
|
|
token: str
|
|
friend_uid: int
|
|
|
|
class getFriendList(BaseModel):
|
|
token: str
|
|
|
|
class CreateMatchRequest(BaseModel):
|
|
token: str
|
|
|
|
class JoinMatchRequest(BaseModel):
|
|
token: str
|
|
match_id: int
|
|
|
|
class ProfileRequest(BaseModel):
|
|
token: str
|
|
|
|
class EndMatchRequest(BaseModel):
|
|
match_id: int
|
|
player1_score: int
|
|
player2_score: int
|
|
|
|
|
|
@app.post("/register")
|
|
def register(request: RegisterRequest):
|
|
try:
|
|
uid = register_user(request.email, request.display_name, request.password)
|
|
return {"message": "User registered successfully", "uid": uid}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@app.post("/login")
|
|
def login(request: LoginRequest):
|
|
sessiontoken = authenticate_user(request.email, request.password)
|
|
if sessiontoken:
|
|
return {"message": "Login successful", "uid": sessiontoken}
|
|
else:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
|
|
@app.post("/add_friend")
|
|
def add_friend_endpoint(request: FriendRequest):
|
|
try:
|
|
success = add_friend(request.token, request.friend_uid)
|
|
return {"message": "Friend added successfully"} if success else HTTPException(400, "Failed to add friend")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@app.get("/leaderboards")
|
|
def leaderboard():
|
|
try:
|
|
leaderboard = get_leaderboard()
|
|
return leaderboard
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
|
|
def get_friend_details(friend_uid):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("SELECT uid, display_name FROM users WHERE uid = %s;", (friend_uid,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return {"uid": result["uid"], "name": result["display_name"]}
|
|
return None
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def get_uid_by_token(token):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("SELECT uid FROM users WHERE session_token = %s;", (token,))
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
raise ValueError("Invalid token")
|
|
return result["uid"]
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
@app.post("/get_friends")
|
|
def get_friends_list(request: getFriendList):
|
|
token = request.token
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
|
|
cursor.execute("SELECT friend_list FROM users WHERE session_token = %s;", (token,))
|
|
result = cursor.fetchone()
|
|
|
|
if not result or not result["friend_list"]:
|
|
raise HTTPException(status_code=404, detail="No friends found.")
|
|
friends = result["friend_list"]
|
|
friends_details = []
|
|
for key, friend_uid in friends.items():
|
|
friend_details = get_friend_details(friend_uid)
|
|
if friend_details:
|
|
friends_details.append(friend_details)
|
|
|
|
return {"friends": friends_details}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
@app.post("/creatematch")
|
|
def create_match(request: CreateMatchRequest):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
player1_uid = get_uid_by_token(request.token)
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO matches (player1_uid)
|
|
VALUES (%s)
|
|
RETURNING match_id;
|
|
""",
|
|
(player1_uid,)
|
|
)
|
|
match_id = cursor.fetchone()["match_id"]
|
|
conn.commit()
|
|
return {"match_id": match_id}
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
@app.post("/joinmatch")
|
|
def join_match(request: JoinMatchRequest):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
player2_uid = get_uid_by_token(request.token)
|
|
cursor.execute(
|
|
"""
|
|
SELECT player1_uid, player2_uid
|
|
FROM matches
|
|
WHERE match_id = %s;
|
|
""",
|
|
(request.match_id,)
|
|
)
|
|
match = cursor.fetchone()
|
|
if not match:
|
|
raise HTTPException(status_code=404, detail="Match not found")
|
|
|
|
if match["player1_uid"] == player2_uid:
|
|
raise HTTPException(status_code=503, detail="You cannot join a match you created")
|
|
|
|
if match["player2_uid"] is not None:
|
|
raise HTTPException(status_code=400, detail="Match is already full")
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE matches
|
|
SET player2_uid = %s
|
|
WHERE match_id = %s;
|
|
""",
|
|
(player2_uid, request.match_id)
|
|
)
|
|
conn.commit()
|
|
return {"message": "Joined match successfully", "match_id": request.match_id}
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def calculate_elo(player1_elo, player2_elo, player1_score, player2_score):
|
|
k_factor = 32
|
|
expected1 = 1 / (1 + 10 ** ((player2_elo - player1_elo) / 400))
|
|
expected2 = 1 / (1 + 10 ** ((player1_elo - player2_elo) / 400))
|
|
actual1 = 1 if player1_score > player2_score else 0 if player1_score < player2_score else 0.5
|
|
actual2 = 1 - actual1
|
|
elo_change1 = round(k_factor * (actual1 - expected1))
|
|
elo_change2 = round(k_factor * (actual2 - expected2))
|
|
return elo_change1, elo_change2
|
|
|
|
@app.post("/endmatch")
|
|
def end_match(request: EndMatchRequest):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
SELECT player1_uid, player2_uid
|
|
FROM matches
|
|
WHERE match_id = %s;
|
|
""",
|
|
(request.match_id,)
|
|
)
|
|
match = cursor.fetchone()
|
|
if not match:
|
|
raise HTTPException(status_code=404, detail="Match not found")
|
|
|
|
player1_uid, player2_uid = match["player1_uid"], match["player2_uid"]
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT uid, current_elo
|
|
FROM users
|
|
WHERE uid IN (%s, %s);
|
|
""",
|
|
(player1_uid, player2_uid)
|
|
)
|
|
players = cursor.fetchall()
|
|
player1_elo = next(p["current_elo"] for p in players if p["uid"] == player1_uid)
|
|
player2_elo = next(p["current_elo"] for p in players if p["uid"] == player2_uid)
|
|
|
|
elo_change1, elo_change2 = calculate_elo(
|
|
player1_elo, player2_elo, request.player1_score, request.player2_score
|
|
)
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE matches
|
|
SET player1_score = %s, player2_score = %s,
|
|
player1_elo_change = %s, player2_elo_change = %s
|
|
WHERE match_id = %s;
|
|
""",
|
|
(request.player1_score, request.player2_score, elo_change1, elo_change2, request.match_id)
|
|
)
|
|
new_player1_elo = player1_elo + elo_change1
|
|
new_player2_elo = player2_elo + elo_change2
|
|
cursor.execute(
|
|
"""
|
|
UPDATE users
|
|
SET current_elo = CASE WHEN uid = %s THEN %s WHEN uid = %s THEN %s END
|
|
WHERE uid IN (%s, %s);
|
|
""",
|
|
(player1_uid, new_player1_elo, player2_uid, new_player2_elo, player1_uid, player2_uid)
|
|
)
|
|
|
|
conn.commit()
|
|
return {"message": "Match ended successfully"}
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
@app.post("/getprofile")
|
|
def get_profile(request: ProfileRequest):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE session_token = %s;", (request.token,))
|
|
user = cursor.fetchone()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT
|
|
m.match_id,
|
|
u.display_name AS opponent_name,
|
|
CASE
|
|
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
|
|
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
|
|
ELSE 'Loss'
|
|
END AS result,
|
|
CASE
|
|
WHEN m.player1_uid = %s THEN m.player1_elo_change
|
|
WHEN m.player2_uid = %s THEN m.player2_elo_change
|
|
END AS elo_change
|
|
FROM matches m
|
|
JOIN users u ON u.uid = CASE
|
|
WHEN m.player1_uid = %s THEN m.player2_uid
|
|
WHEN m.player2_uid = %s THEN m.player1_uid
|
|
END
|
|
WHERE %s IN (m.player1_uid, m.player2_uid)
|
|
ORDER BY m.match_date DESC
|
|
LIMIT 10;
|
|
""",
|
|
(uid, uid, uid, uid, uid, uid, uid)
|
|
)
|
|
matches = cursor.fetchall()
|
|
|
|
return {
|
|
"name": display_name,
|
|
"uid": uid,
|
|
"elo": current_elo,
|
|
"matches": [
|
|
{
|
|
"match_id": match["match_id"],
|
|
"opponent_name": match["opponent_name"],
|
|
"result": match["result"],
|
|
"elo_change": match["elo_change"]
|
|
}
|
|
for match in matches
|
|
]
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
@app.post("/getprofile")
|
|
def get_profile(request: ProfileRequest):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute("SELECT uid, display_name, current_elo FROM users WHERE token = %s;", (request.token,))
|
|
user = cursor.fetchone()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
uid, display_name, current_elo = user["uid"], user["display_name"], user["current_elo"]
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT
|
|
m.match_id,
|
|
u.display_name AS opponent_name,
|
|
m.match_date,
|
|
CASE
|
|
WHEN m.player1_uid = %s AND m.player1_score > m.player2_score THEN 'Win'
|
|
WHEN m.player2_uid = %s AND m.player2_score > m.player1_score THEN 'Win'
|
|
ELSE 'Loss'
|
|
END AS result,
|
|
CASE
|
|
WHEN m.player1_uid = %s THEN m.player1_elo_change
|
|
WHEN m.player2_uid = %s THEN m.player2_elo_change
|
|
END AS elo_change
|
|
FROM matches m
|
|
JOIN users u ON u.uid = CASE
|
|
WHEN m.player1_uid = %s THEN m.player2_uid
|
|
WHEN m.player2_uid = %s THEN m.player1_uid
|
|
END
|
|
WHERE %s IN (m.player1_uid, m.player2_uid)
|
|
ORDER BY m.match_date DESC
|
|
LIMIT 10;
|
|
""",
|
|
(uid, uid, uid, uid, uid, uid, uid)
|
|
)
|
|
matches = cursor.fetchall()
|
|
|
|
return {
|
|
"name": display_name,
|
|
"uid": uid,
|
|
"elo": current_elo,
|
|
"matches": [
|
|
{
|
|
"match_id": match["match_id"],
|
|
"opponent_name": match["opponent_name"],
|
|
"result": match["result"],
|
|
"elo_change": match["elo_change"],
|
|
"match_date": match["match_date"].isoformat()
|
|
}
|
|
for match in matches
|
|
]
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
|
|
@app.get("/version")
|
|
def get_latest_commit_hashes():
|
|
try:
|
|
backend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-backend/commits")
|
|
frontend_response = requests.get("https://git.mercurio.moe/api/v1/repos/Mercury/dth-pingpong-mobileapp/commits")
|
|
|
|
if backend_response.status_code == 200 and frontend_response.status_code == 200:
|
|
backend_hash = backend_response.json()[0]["sha"]
|
|
frontend_hash = frontend_response.json()[0]["sha"]
|
|
return {"backend": backend_hash, "frontend": frontend_hash}
|
|
|
|
return {"error": "Failed to fetch commit hashes from Forgejo"}
|
|
except Exception as e:
|
|
return {"error": str(e)} |