Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify | |
from flask_socketio import SocketIO, emit | |
import os | |
import requests | |
import json | |
import uuid | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import logging | |
from werkzeug.utils import secure_filename | |
import random | |
import asyncio | |
# Initialize Flask and configure core settings | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = os.urandom(24) # Generate a random secret key for security | |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # Limit file uploads to 16MB | |
# Initialize SocketIO with CORS support for development | |
socketio = SocketIO(app, cors_allowed_origins="*") | |
# Load environment variables from .env file | |
load_dotenv() | |
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY') | |
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY') | |
# Configure logging to track application behavior | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class GameState: | |
"""Manages the state of all active game sessions.""" | |
def __init__(self): | |
"""Initialize the game state manager.""" | |
self.games = {} # Dictionary to store all active games | |
self.cleanup_interval = 3600 # Cleanup inactive games every hour | |
def create_game(self): | |
"""Create a new game session with a unique identifier.""" | |
game_id = str(uuid.uuid4()) | |
self.games[game_id] = { | |
'players': [], | |
'current_phase': 'setup', | |
'recordings': {}, | |
'impostor': None, | |
'votes': {}, | |
'question': None, | |
'impostor_answer': None, | |
'modified_recording': None, | |
'round_number': 1, | |
'start_time': datetime.now().isoformat(), | |
'completed_rounds': [], | |
'score': {'impostor_wins': 0, 'player_wins': 0} | |
} | |
return game_id | |
def cleanup_inactive_games(self): | |
"""Remove inactive game sessions older than 2 hours.""" | |
current_time = datetime.now() | |
inactive_threshold = 7200 # 2 hours in seconds | |
for game_id, game in list(self.games.items()): | |
start_time = datetime.fromisoformat(game['start_time']) | |
if (current_time - start_time).total_seconds() > inactive_threshold: | |
del self.games[game_id] | |
# Initialize global game state | |
game_state = GameState() | |
async def generate_question(): | |
"""Generate an engaging question using Mistral AI.""" | |
try: | |
headers = { | |
'Authorization': f'Bearer {MISTRAL_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
# Craft a prompt that encourages interesting, personal questions | |
payload = { | |
'messages': [{ | |
'role': 'user', | |
'content': '''Generate an engaging personal question for a social game. | |
The question should: | |
1. Encourage creative and unique responses | |
2. Be open-ended but not too philosophical | |
3. Be answerable in 15-30 seconds | |
4. Be appropriate for all ages | |
5. Spark interesting conversation | |
Generate only the question, without any additional text.''' | |
}] | |
} | |
response = requests.post( | |
'https://api.mistral.ai/v1/chat/completions', | |
headers=headers, | |
json=payload, | |
timeout=10 # Set timeout to handle slow responses | |
) | |
if response.status_code == 200: | |
question = response.json()['choices'][0]['message']['content'].strip() | |
logger.info(f"Generated question: {question}") | |
return question | |
else: | |
logger.error(f"Mistral API error: {response.status_code}") | |
# Fallback questions if API fails | |
fallback_questions = [ | |
"What is your favorite childhood memory?", | |
"What's the most interesting place you've ever visited?", | |
"What's a skill you'd love to master and why?", | |
"What's the best piece of advice you've ever received?" | |
] | |
return random.choice(fallback_questions) | |
except Exception as e: | |
logger.error(f"Error generating question: {str(e)}") | |
return "What is your favorite memory from your childhood?" | |
async def generate_impostor_answer(question): | |
"""Generate a convincing impostor response using Mistral AI.""" | |
try: | |
headers = { | |
'Authorization': f'Bearer {MISTRAL_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
# Craft a detailed prompt for generating a natural response | |
prompt = f'''Given the question "{question}", generate a detailed and convincing personal response. | |
The response should: | |
1. Sound natural and conversational | |
2. Be 2-3 sentences long | |
3. Include specific details to sound authentic | |
4. Be suitable for text-to-speech conversion | |
5. Avoid complex words or punctuation that might affect voice synthesis | |
Generate only the response, without any additional context.''' | |
payload = { | |
'messages': [{ | |
'role': 'user', | |
'content': prompt | |
}] | |
} | |
response = requests.post( | |
'https://api.mistral.ai/v1/chat/completions', | |
headers=headers, | |
json=payload, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
answer = response.json()['choices'][0]['message']['content'].strip() | |
logger.info(f"Generated impostor answer: {answer}") | |
return answer | |
else: | |
logger.error(f"Mistral API error generating answer: {response.status_code}") | |
return "I have an interesting story about that, but I'd need more time to explain it properly." | |
except Exception as e: | |
logger.error(f"Error generating impostor answer: {str(e)}") | |
return "I have an interesting story about that, but I'd need more time to explain it properly." | |
async def clone_voice(audio_file): | |
"""Clone a voice using ElevenLabs API.""" | |
try: | |
headers = { | |
'xi-api-key': ELEVENLABS_API_KEY | |
} | |
with open(audio_file, 'rb') as f: | |
files = { | |
'files': ('recording.wav', f, 'audio/wav') | |
} | |
response = requests.post( | |
'https://api.elevenlabs.io/v1/voices/add', | |
headers=headers, | |
files=files, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
voice_id = response.json().get('voice_id') | |
logger.info(f"Successfully cloned voice: {voice_id}") | |
return voice_id | |
else: | |
logger.error(f"ElevenLabs voice cloning error: {response.status_code}") | |
return None | |
except Exception as e: | |
logger.error(f"Error cloning voice: {str(e)}") | |
return None | |
async def generate_cloned_speech(voice_id, text): | |
"""Generate speech using ElevenLabs with a cloned voice.""" | |
try: | |
headers = { | |
'xi-api-key': ELEVENLABS_API_KEY, | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
'text': text, | |
'voice_settings': { | |
'stability': 0.75, | |
'similarity_boost': 0.75 | |
} | |
} | |
response = requests.post( | |
f'https://api.elevenlabs.io/v1/text-to-speech/{voice_id}', | |
headers=headers, | |
json=payload, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
filename = f"temp/impostor_audio_{uuid.uuid4()}.mp3" | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
logger.info(f"Generated cloned speech: {filename}") | |
return filename | |
else: | |
logger.error(f"ElevenLabs speech generation error: {response.status_code}") | |
return None | |
except Exception as e: | |
logger.error(f"Error generating cloned speech: {str(e)}") | |
return None | |
def home(): | |
"""Serve the main game page.""" | |
return render_template('index.html') | |
async def start_game(): | |
"""Initialize a new game session.""" | |
try: | |
data = request.get_json() | |
game_id = data.get('game_id') | |
if game_id not in game_state.games: | |
return jsonify({'error': 'Game not found'}), 404 | |
game = game_state.games[game_id] | |
# Generate question for the round | |
question = await generate_question() | |
game['question'] = question | |
game['current_phase'] = 'recording' | |
return jsonify({ | |
'status': 'success', | |
'question': question | |
}) | |
except Exception as e: | |
logger.error(f"Error starting game: {str(e)}") | |
return jsonify({'error': 'Internal server error'}), 500 | |
async def submit_recording(): | |
"""Handle voice recording submissions.""" | |
try: | |
game_id = request.form.get('game_id') | |
player_id = request.form.get('player_id') | |
audio_file = request.files.get('audio') | |
if not all([game_id, player_id, audio_file]): | |
return jsonify({'error': 'Missing required data'}), 400 | |
if game_id not in game_state.games: | |
return jsonify({'error': 'Game not found'}), 404 | |
# Save the recording | |
filename = secure_filename(f"recording_{game_id}_{player_id}.wav") | |
filepath = os.path.join('temp', filename) | |
audio_file.save(filepath) | |
game_state.games[game_id]['recordings'][player_id] = filepath | |
return jsonify({'status': 'success'}) | |
except Exception as e: | |
logger.error(f"Error submitting recording: {str(e)}") | |
return jsonify({'error': 'Internal server error'}), 500 | |
async def process_impostor(): | |
"""Handle the complete impostor voice generation process.""" | |
try: | |
data = request.get_json() | |
game_id = data.get('game_id') | |
impostor_id = data.get('impostor_id') | |
if game_id not in game_state.games: | |
return jsonify({'error': 'Game not found'}), 404 | |
game = game_state.games[game_id] | |
# Generate impostor's answer | |
impostor_answer = await generate_impostor_answer(game['question']) | |
game['impostor_answer'] = impostor_answer | |
# Get impostor's original recording | |
original_recording = game['recordings'].get(impostor_id) | |
if not original_recording: | |
return jsonify({'error': 'Original recording not found'}), 404 | |
# Clone voice | |
voice_id = await clone_voice(original_recording) | |
if not voice_id: | |
return jsonify({'error': 'Voice cloning failed'}), 500 | |
# Generate modified speech | |
audio_file = await generate_cloned_speech(voice_id, impostor_answer) | |
if not audio_file: | |
return jsonify({'error': 'Speech generation failed'}), 500 | |
game['modified_recording'] = audio_file | |
return jsonify({ | |
'status': 'success', | |
'audio_url': audio_file | |
}) | |
except Exception as e: | |
logger.error(f"Error processing impostor: {str(e)}") | |
return jsonify({'error': 'Internal server error'}), 500 | |
def handle_join_game(data): | |
"""Handle a player joining the game.""" | |
game_id = data.get('game_id') | |
player_name = data.get('player_name') | |
if game_id in game_state.games: | |
game = game_state.games[game_id] | |
if len(game['players']) < 5: | |
player_id = len(game['players']) + 1 | |
game['players'].append({ | |
'id': player_id, | |
'name': player_name | |
}) | |
emit('player_joined', { | |
'player_id': player_id, | |
'player_name': player_name | |
}, broadcast=True, room=game_id) | |
def handle_vote(data): | |
"""Process player votes and determine round outcome.""" | |
game_id = data.get('game_id') | |
voter_id = data.get('voter_id') | |
vote_for = data.get('vote_for') | |
if game_id in game_state.games: | |
game = game_state.games[game_id] | |
game['votes'][voter_id] = vote_for | |
# Check if all players have voted | |
if len(game['votes']) == len(game['players']): | |
# Calculate results | |
votes_count = {} | |
for vote in game['votes'].values(): | |
votes_count[vote] = votes_count.get(vote, 0) + 1 | |
most_voted = max(votes_count.items(), key=lambda x: x[1])[0] | |
# Update scores | |
if most_voted == game['impostor']: | |
game['score']['player_wins'] += 1 | |
else: | |
game['score']['impostor_wins'] += 1 | |
# Store round results | |
game['completed_rounds'].append({ | |
'round_number': game['round_number'], | |
'impostor': game['impostor'], | |
'votes': game['votes'].copy(), | |
'most_voted': most_voted | |
}) | |
# Emit results | |
emit('round_result', { | |
'impostor': game['impostor'], | |
'most_voted': most_voted, | |
'votes': game['votes'], | |
'score': game['score'] | |
}, broadcast=True, room=game_id) | |
if __name__ == '__main__': | |
# Create temporary directory for recordings if it doesn't exist | |
os.makedirs('temp', exist_ok=True) | |
# Start the server | |
socketio.run(app, host='0.0.0.0', port=7860, debug=True) |