Spaces:
Runtime error
Runtime error
import asyncio | |
import json | |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends | |
from fastapi.responses import HTMLResponse | |
from fastapi.security import OAuth2PasswordBearer | |
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table | |
from sqlalchemy.orm import sessionmaker | |
import gradio as gr | |
from transformers import pipeline | |
from PIL import Image | |
# Database Setup | |
DATABASE_URL = "sqlite:///chatbot.db" | |
engine = create_engine(DATABASE_URL) | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
metadata = MetaData() | |
def create_table(table_name, columns): | |
if table_name in engine.table_names(): | |
return f"Table '{table_name}' already exists." | |
columns_list = [Column('id', Integer, primary_key=True)] | |
for col_name, col_type in columns.items(): | |
if col_type.lower() == 'string': | |
columns_list.append(Column(col_name, String)) | |
elif col_type.lower() == 'integer': | |
columns_list.append(Column(col_name, Integer)) | |
else: | |
return "Unsupported column type. Use 'String' or 'Integer'." | |
new_table = Table(table_name, metadata, *columns_list) | |
metadata.create_all(engine) | |
return f"Table '{table_name}' created successfully." | |
def edit_table(table_name, columns): | |
if table_name not in engine.table_names(): | |
return f"Table '{table_name}' does not exist." | |
table = Table(table_name, metadata, autoload_with=engine) | |
for col_name, col_type in columns.items(): | |
if col_name not in table.c: | |
if col_type.lower() == 'string': | |
new_column = Column(col_name, String) | |
elif col_type.lower() == 'integer': | |
new_column = Column(col_name, Integer) | |
else: | |
return "Unsupported column type. Use 'String' or 'Integer'." | |
new_column.create(table, populate_default=True) | |
return f"Table '{table_name}' updated successfully." | |
app = FastAPI() | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
class ConnectionManager: | |
def __init__(self): | |
self.active_connections: dict[str, WebSocket] = {} | |
async def connect(self, websocket: WebSocket, username: str): | |
await websocket.accept() | |
self.active_connections[username] = websocket | |
def disconnect(self, username: str): | |
self.active_connections.pop(username, None) | |
async def broadcast(self, message: str): | |
for connection in self.active_connections.values(): | |
await connection.send_text(message) | |
manager = ConnectionManager() | |
async def websocket_endpoint(websocket: WebSocket, username: str, token: str = Depends(oauth2_scheme)): | |
await manager.connect(websocket, username) | |
try: | |
while True: | |
data = await websocket.receive_text() | |
await manager.broadcast(f"{username}: {data}") | |
except WebSocketDisconnect: | |
manager.disconnect(username) | |
async def login(): | |
# Simplified token generation for demo purposes | |
return {"access_token": "fake_token", "token_type": "bearer"} | |
async def chatbot(task: str, table_name: str, columns: str): | |
response, description = handle_chatbot(task, table_name, columns) | |
return {"result": response, "description": description} | |
async def get(): | |
return HTMLResponse(""" | |
<html> | |
<head> | |
<title>Real-time Chat</title> | |
</head> | |
<body> | |
<h1>WebSocket Chat</h1> | |
<input id="messageText" type="text" autocomplete="off"/> | |
<button onclick="sendMessage()">Send</button> | |
<ul id='messages'> | |
</ul> | |
<script> | |
var ws = new WebSocket("ws://localhost:8000/ws/test_user"); | |
ws.onmessage = function(event) { | |
var messages = document.getElementById('messages') | |
var message = document.createElement('li') | |
var content = document.createTextNode(event.data) | |
message.appendChild(content) | |
messages.appendChild(message) | |
}; | |
function sendMessage() { | |
var input = document.getElementById("messageText") | |
ws.send(input.value) | |
input.value = '' | |
} | |
</script> | |
</body> | |
</html> | |
""") | |
# Image generation setup | |
image_generator = pipeline("image-generation", model="CompVis/stable-diffusion-v1-4") | |
# Helper functions | |
def chatbot_response(task, table_name=None, columns=None): | |
if task == "create_table": | |
if table_name and columns: | |
result = create_table(table_name, columns) | |
else: | |
result = "Please provide a table name and columns." | |
elif task == "edit_table": | |
if table_name and columns: | |
result = edit_table(table_name, columns) | |
else: | |
result = "Please provide a table name and columns." | |
else: | |
result = "Unsupported task. Use 'create_table' or 'edit_table'." | |
description = f"Task: {task}, Table Name: {table_name}, Columns: {columns}" | |
return result, description | |
def handle_chatbot(task, table_name, columns): | |
if task not in ['create_table', 'edit_table']: | |
return "Unsupported task. Use 'create_table' or 'edit_table'.", None | |
try: | |
columns_dict = json.loads(columns) | |
except json.JSONDecodeError: | |
return "Invalid columns format. Please use JSON format.", None | |
return chatbot_response(task, table_name, columns_dict) | |
def generate_image(description): | |
images = image_generator(description, num_return_sequences=1) | |
image = images[0]['image'] | |
return image | |
# Gradio interface setup | |
def handle_gradio_chatbot(task, table_name, columns): | |
response, description = handle_chatbot(task, table_name, columns) | |
image = generate_image(description) | |
return response, image | |
task_input = gr.inputs.Textbox(lines=1, placeholder="Task (create_table or edit_table)") | |
table_name_input = gr.inputs.Textbox(lines=1, placeholder="Table Name") | |
columns_input = gr.inputs.Textbox(lines=2, placeholder="Columns (JSON format: {'column1': 'type', 'column2': 'type'})") | |
interface = gr.Interface( | |
fn=handle_gradio_chatbot, | |
inputs=[task_input, table_name_input, columns_input], | |
outputs=[gr.outputs.Textbox(), gr.outputs.Image(type="pil")], | |
title="Multiplayer Game with SQL Database and Image Generation", | |
description="A multiplayer game interface to create and edit SQL tables with image generation." | |
) | |
# Function to run FastAPI server in the background | |
def run_server(): | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |
# Run FastAPI server in a separate thread | |
import threading | |
threading.Thread(target=run_server, daemon=True).start() | |
# Launch Gradio interface | |
interface.launch() | |