"""
Unipress Game Client
Client for communicating with the game server running in Docker container.
"""
import json
import time
from typing import Any, Dict, Optional
import requests
[docs]
class UnipressClient:
"""Client for Unipress game server."""
[docs]
def __init__(self, server_url: str = "http://localhost:5000"):
"""
Initialize client.
Args:
server_url: URL of the game server (default: http://localhost:5000)
"""
self.server_url = server_url.rstrip("/")
self.session = requests.Session()
[docs]
def health_check(self) -> Dict[str, Any]:
"""Check server health."""
try:
response = self.session.get(f"{self.server_url}/health")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
return {"error": str(e), "status": "unhealthy"}
[docs]
def list_games(self) -> Dict[str, Any]:
"""List available games."""
try:
response = self.session.get(f"{self.server_url}/games/list")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
return {"error": str(e)}
[docs]
def run_game(self, game: str, difficulty: int = 5) -> Dict[str, Any]:
"""
Run a specific game.
Args:
game: Game name or module (e.g., "jumper", "demo_jump")
difficulty: Difficulty level 1-10 (default: 5)
"""
try:
# Map game names to modules
game_modules = {
"jumper": "unipress.games.jumper.game",
"demo_jump": "unipress.games.demo_jump.game",
}
game_module = game_modules.get(game, game)
data = {
"game": game_module,
"difficulty": difficulty
}
response = self.session.post(
f"{self.server_url}/games/run",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
return {"error": str(e)}
[docs]
def stop_game(self) -> Dict[str, Any]:
"""Stop currently running game."""
try:
response = self.session.post(f"{self.server_url}/games/stop")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
return {"error": str(e)}
[docs]
def game_status(self) -> Dict[str, Any]:
"""Get current game status."""
try:
response = self.session.get(f"{self.server_url}/games/status")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
return {"error": str(e)}
[docs]
def wait_for_game_completion(self, timeout: int = 300) -> bool:
"""
Wait for current game to complete.
Args:
timeout: Maximum time to wait in seconds (default: 300)
Returns:
True if game completed, False if timeout
"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.game_status()
if "error" in status:
print(f"Error checking game status: {status['error']}")
return False
if not status.get("game_running", False):
return True
time.sleep(1)
return False
[docs]
def main():
"""Example usage of the client."""
client = UnipressClient()
# Check server health
health = client.health_check()
print(f"Server health: {health}")
if "error" in health:
print("Server is not available. Make sure the Docker container is running.")
return
# List available games
games = client.list_games()
print(f"Available games: {json.dumps(games, indent=2)}")
# Run jumper game with difficulty 7
result = client.run_game("jumper", difficulty=7)
print(f"Game start result: {result}")
# Wait for game to complete
print("Waiting for game to complete...")
completed = client.wait_for_game_completion(timeout=60)
if completed:
print("Game completed!")
else:
print("Game did not complete within timeout")
# Stop the game
stop_result = client.stop_game()
print(f"Stop result: {stop_result}")
if __name__ == "__main__":
main()