Spaces:
Runtime error
Runtime error
File size: 6,928 Bytes
ef3b11c b075a9e ef3b11c 7e5261e ef3b11c b075a9e 7e5261e b075a9e 983867e b075a9e 06bea5a b075a9e 06bea5a b075a9e ef3b11c b075a9e 983867e b075a9e 983867e b075a9e ef3b11c 06bea5a b075a9e 06bea5a b075a9e ef3b11c b075a9e ef3b11c b075a9e ef3b11c 7ea7941 b075a9e ef3b11c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.responses import HTMLResponse
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table
from sqlalchemy.orm import sessionmaker
import gradio as gr
from transformers import pipeline
from PIL import Image
# Database Setup
DATABASE_URL = "sqlite:///chatbot.db"
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
metadata = MetaData()
def create_table(table_name, columns):
if table_name in engine.table_names():
return f"Table '{table_name}' already exists."
columns_list = [Column('id', Integer, primary_key=True)]
for col_name, col_type in columns.items():
if col_type.lower() == 'string':
columns_list.append(Column(col_name, String))
elif col_type.lower() == 'integer':
columns_list.append(Column(col_name, Integer))
else:
return "Unsupported column type. Use 'String' or 'Integer'."
new_table = Table(table_name, metadata, *columns_list)
metadata.create_all(engine)
return f"Table '{table_name}' created successfully."
def edit_table(table_name, columns):
if table_name not in engine.table_names():
return f"Table '{table_name}' does not exist."
table = Table(table_name, metadata, autoload_with=engine)
for col_name, col_type in columns.items():
if col_name not in table.c:
if col_type.lower() == 'string':
new_column = Column(col_name, String)
elif col_type.lower() == 'integer':
new_column = Column(col_name, Integer)
else:
return "Unsupported column type. Use 'String' or 'Integer'."
new_column.create(table, populate_default=True)
return f"Table '{table_name}' updated successfully."
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, username: str):
await websocket.accept()
self.active_connections[username] = websocket
def disconnect(self, username: str):
self.active_connections.pop(username, None)
async def broadcast(self, message: str):
for connection in self.active_connections.values():
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{username}")
async def websocket_endpoint(websocket: WebSocket, username: str, token: str = Depends(oauth2_scheme)):
await manager.connect(websocket, username)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{username}: {data}")
except WebSocketDisconnect:
manager.disconnect(username)
@app.post("/token")
async def login():
# Simplified token generation for demo purposes
return {"access_token": "fake_token", "token_type": "bearer"}
@app.post("/chatbot")
async def chatbot(task: str, table_name: str, columns: str):
response, description = handle_chatbot(task, table_name, columns)
return {"result": response, "description": description}
@app.get("/")
async def get():
return HTMLResponse("""
<html>
<head>
<title>Real-time Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<input id="messageText" type="text" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws/test_user");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage() {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
}
</script>
</body>
</html>
""")
# Image generation setup
image_generator = pipeline("image-generation", model="CompVis/stable-diffusion-v1-4")
# Helper functions
def chatbot_response(task, table_name=None, columns=None):
if task == "create_table":
if table_name and columns:
result = create_table(table_name, columns)
else:
result = "Please provide a table name and columns."
elif task == "edit_table":
if table_name and columns:
result = edit_table(table_name, columns)
else:
result = "Please provide a table name and columns."
else:
result = "Unsupported task. Use 'create_table' or 'edit_table'."
description = f"Task: {task}, Table Name: {table_name}, Columns: {columns}"
return result, description
def handle_chatbot(task, table_name, columns):
if task not in ['create_table', 'edit_table']:
return "Unsupported task. Use 'create_table' or 'edit_table'.", None
try:
columns_dict = json.loads(columns)
except json.JSONDecodeError:
return "Invalid columns format. Please use JSON format.", None
return chatbot_response(task, table_name, columns_dict)
def generate_image(description):
images = image_generator(description, num_return_sequences=1)
image = images[0]['image']
return image
# Gradio interface setup
def handle_gradio_chatbot(task, table_name, columns):
response, description = handle_chatbot(task, table_name, columns)
image = generate_image(description)
return response, image
task_input = gr.inputs.Textbox(lines=1, placeholder="Task (create_table or edit_table)")
table_name_input = gr.inputs.Textbox(lines=1, placeholder="Table Name")
columns_input = gr.inputs.Textbox(lines=2, placeholder="Columns (JSON format: {'column1': 'type', 'column2': 'type'})")
interface = gr.Interface(
fn=handle_gradio_chatbot,
inputs=[task_input, table_name_input, columns_input],
outputs=[gr.outputs.Textbox(), gr.outputs.Image(type="pil")],
title="Multiplayer Game with SQL Database and Image Generation",
description="A multiplayer game interface to create and edit SQL tables with image generation."
)
# Function to run FastAPI server in the background
def run_server():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
# Run FastAPI server in a separate thread
import threading
threading.Thread(target=run_server, daemon=True).start()
# Launch Gradio interface
interface.launch()
|