Spaces:
Sleeping
Sleeping
import gradio as gr | |
import ssl | |
from openai import OpenAI | |
import time | |
import os | |
import shutil | |
from datetime import datetime | |
import Arcana | |
from nylon import * | |
import pandas as pd | |
import json | |
# SSL configuration to avoid verification issues | |
try: | |
_create_unverified_https_context = ssl._create_unverified_context | |
except AttributeError: | |
pass | |
else: | |
ssl._create_default_https_context = _create_unverified_https_context | |
def query_database2(query): | |
db = ChatDatabase('memory.txt') | |
sender = 'Arcana' | |
N = 10 | |
cache = {} | |
query_tag = None | |
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) | |
print("Relevant messages:") | |
for message in relevant_messages: | |
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") | |
print(f"Message: {message[2][:100]}...") | |
print() | |
df_data = [str(message) for message in relevant_messages] | |
return ';'.join(df_data) | |
# OpenAI client setup | |
client = OpenAI( | |
base_url='https://api.openai-proxy.org/v1', | |
api_key='sk-Nxf8HmLpfIMhCd83n3TOr00TR57uBZ0jMbAgGCOzppXvlsx1', | |
) | |
# Function list for OpenAI API | |
function_list = [ | |
{ | |
"name": "query_database", | |
"description": "Query the database and return a list of results as strings", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "The query to execute against the database" | |
}, | |
}, | |
"required": ["query"] | |
} | |
} | |
] | |
# Mapping of function names to actual function objects | |
function_map = { | |
"query_database": query_database2 | |
} | |
def execute_function(function_name, function_args): | |
if function_name in function_map: | |
return function_map[function_name](**function_args) | |
else: | |
return f"Error: Function {function_name} not found" | |
# Retry logic for OpenAI API call | |
def openai_api_call(messages, retries=3, delay=5): | |
for attempt in range(retries): | |
try: | |
completion = client.chat.completions.create( | |
model="gpt-3.5-turbo", # Changed from "gpt-4o" to "gpt-4" | |
messages=messages, | |
functions=function_list, | |
function_call='auto', | |
timeout=10 | |
) | |
response_message = completion.choices[0].message | |
# Check if the model wants to call a function | |
if response_message.function_call: | |
function_name = response_message.function_call.name | |
function_args = json.loads(response_message.function_call.arguments) | |
function_response = execute_function(function_name, function_args) | |
# Add the function response to the conversation | |
messages.append(response_message.model_dump()) # The model's request to call the function | |
messages.append({ | |
"role": "function", | |
"name": function_name, | |
"content": json.dumps(function_response) | |
}) | |
# Make a follow-up call to the model with the function response | |
return openai_api_call(messages) | |
else: | |
return response_message.content | |
except Exception as e: | |
print(f"Attempt {attempt + 1} failed: {e}") | |
if attempt < retries - 1: | |
time.sleep(delay) | |
else: | |
return "Sorry, I am having trouble connecting to the server. Please try again later." | |
# Chatbot response function | |
def chatbot_response(message, history): | |
messages = [{"role": "system", "content": '''You are Arcana, a dynamic study resource database designed to help students excel in their exams. Your responses should be accurate, informative, and evidence-based whenever possible. Follow these guidelines: | |
Your primary goal is to provide students with the most helpful and accurate study information, utilizing both your internal knowledge and the PDF resources at your disposal.'''}] | |
for human, assistant in history: | |
messages.append({"role": "user", "content": human}) | |
messages.append({"role": "assistant", "content": assistant}) | |
messages.append({"role": "user", "content": message}) | |
response = openai_api_call(messages) | |
return response | |
selected = None | |
from concurrent.futures import ThreadPoolExecutor | |
# Function to handle the file upload | |
def handle_file_upload(file): | |
# Ensure the cache2 directory exists | |
cache_dir = 'cache' | |
os.makedirs(cache_dir, exist_ok=True) | |
# Get the uploaded file path | |
file_path = file.name | |
# Define the new path for the uploaded file | |
new_file_path = os.path.join(cache_dir, os.path.basename(file_path)) | |
# Move the file to the cache2 directory | |
shutil.move(file_path, new_file_path) | |
# Get the file size | |
file_size = os.path.getsize(new_file_path) | |
return f"File saved to {new_file_path} with size: {file_size} bytes" | |
# Wrapper function to run the file upload in a thread | |
def handle_file_upload_threaded(file): | |
with ThreadPoolExecutor() as executor: | |
future = executor.submit(handle_file_upload, file) | |
return future.result() | |
def list_uploaded_files(): | |
foldername = 'cache' | |
if not os.path.exists(foldername): | |
return [] | |
files = os.listdir(foldername) | |
return [[file] for file in files] | |
def on_select(evt: gr.SelectData): | |
global selected | |
selected_value = evt.value | |
selected_index = evt.index | |
selected = selected_value | |
print(f"Selected value: {selected_value} at index: {selected_index}") | |
file_path = os.path.join("cache", selected_value) if selected_value else None | |
status_message = f"Selected: {selected_value}" if selected_value else "No file selected" | |
file_size = get_file_size(file_path) if file_path else "" | |
file_creation_time = get_file_creation_time(file_path) if file_path else "" | |
return file_path, status_message, file_size, file_creation_time | |
def get_file_size(file_path): | |
if file_path and os.path.exists(file_path): | |
size_bytes = os.path.getsize(file_path) | |
if size_bytes < 1024: | |
return f"{size_bytes} bytes" | |
elif size_bytes < 1024 * 1024: | |
return f"{size_bytes / 1024:.2f} KB" | |
else: | |
return f"{size_bytes / (1024 * 1024):.2f} MB" | |
return "" | |
def get_file_creation_time(file_path): | |
if file_path and os.path.exists(file_path): | |
creation_time = os.path.getctime(file_path) | |
return datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S") | |
return "" | |
def delete_file(): | |
global selected | |
if selected: | |
foldername = 'cache' | |
file_path = os.path.join(foldername, selected) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return list_uploaded_files(), None, f"File {selected} deleted successfully", "", "" | |
else: | |
return list_uploaded_files(), None, f"File {selected} not found", "", "" | |
else: | |
return list_uploaded_files(), None, "No file selected for deletion", "", "" | |
def refresh_files(): | |
return list_uploaded_files() | |
def display_file(evt: gr.SelectData, df): | |
file_path = os.path.join("cache", evt.value) | |
return file_path, file_path if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) else None, f"Displaying: {evt.value}" | |
def render_to_database(): | |
# This function is undefined as per your request | |
Arcana.main() | |
def change_theme(theme): | |
gr.Interface.theme = theme | |
def rename_file(new_name): | |
global selected | |
if selected and new_name: | |
old_path = os.path.join('cache', selected) | |
new_path = os.path.join('cache', new_name+'.'+selected.split('.')[-1]) | |
if os.path.exists(old_path): | |
os.rename(old_path, new_path) | |
selected = new_name | |
return list_uploaded_files(), f"File renamed to {new_name}", new_path, get_file_size(new_path), get_file_creation_time(new_path) | |
else: | |
return list_uploaded_files(), f"File {selected} not found", None, "", "" | |
return list_uploaded_files(), "No file selected or new name not provided", None, "", "" | |
def query_database(query): | |
# Usage example | |
db = ChatDatabase('memory.txt') | |
# Example 1: Get relevant messages | |
sender = 'Arcana' | |
N = 10 | |
cache = {} | |
query_tag = None | |
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) | |
print("Relevant messages:") | |
for message in relevant_messages: | |
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") | |
print(f"Message: {message[2][:100]}...") | |
print() | |
df_data = [{"Nylon Returned Query": str(message)} for message in relevant_messages] | |
# Create a pandas DataFrame | |
df = pd.DataFrame(df_data) | |
return df | |
example_database = [ | |
"What is Hydrogen Bonding?", | |
"Tell me the difference between impulse and force.", | |
"Tell me a joke that Calculus students will understand.", | |
"How should I review for the AP Biology Exam?", | |
"What kind of resources are available in PA and Indexademics?", | |
"What is the StandardCAS™ group?", | |
"Explain the concept of quantum entanglement.", | |
"What are the main differences between mitosis and meiosis?", | |
"How does the Doppler effect work?", | |
"Explain the process of photosynthesis.", | |
"What is the significance of the Pythagorean theorem?", | |
"How does natural selection contribute to evolution?", | |
"What is the most important chapter in AP Statistics?", | |
"How should I prepare on the IB Chinese Exam?" | |
] | |
import random | |
def get_random_examples(num_examples=5): | |
return random.sample(example_database, min(num_examples, len(example_database))) | |
# Create the Gradio interface for the chatbot | |
chatbot_interface = gr.ChatInterface( | |
chatbot_response, | |
chatbot=gr.Chatbot(height=400), | |
textbox=gr.Textbox(placeholder="Type your message here...", container=True, scale=10), | |
title="Review With Arcana", | |
description="ArcanaUI v0.8 - Chatbot", | |
theme="soft", | |
examples=get_random_examples(), | |
cache_examples=False, | |
retry_btn=None, | |
undo_btn="Delete Previous", | |
clear_btn="Clear" | |
) | |
def relaunch(): | |
global demo | |
demo.close() | |
demo.launch(share=True) | |
# Combine the interfaces using Tabs | |
with gr.Blocks(js=""" | |
async () => { | |
const originalFetch = window.fetch; | |
window.fetch = (url, options) => { | |
if (options && options.signal) { | |
const controller = new AbortController(); | |
options.signal = controller.signal; | |
setTimeout(() => controller.abort(), 3600000); // 300000 ms = 5 minutes | |
} | |
return originalFetch(url, options); | |
}; | |
} | |
""") as demo: | |
gr.Markdown("# ArcanaUI v0.8") | |
with gr.Tabs(): | |
with gr.TabItem("Welcome Page"): | |
with open('introduction.txt',mode='r') as file: | |
intro_content = file.read() | |
gr.Markdown(intro_content) | |
with gr.TabItem("Chatbot"): | |
chatbot_interface.render() | |
# File uploading interface | |
with gr.TabItem('Upload'): | |
gr.Markdown('# Upload and View Files') | |
with gr.Row(): | |
# Left column: File list and buttons | |
with gr.Column(scale=1): | |
gr.Markdown("## Upload File") | |
file_input = gr.File(label="Upload your file here", file_types=["pdf", "jpeg", "jpg", "gif", "docx"]) | |
file_input.change(handle_file_upload_threaded, inputs=file_input) | |
uploaded_files_list = gr.DataFrame(headers=["Uploaded Files"], datatype="str", interactive=False) | |
with gr.Row(): | |
refresh_button = gr.Button('Refresh') | |
delete_button = gr.Button('Delete Selected File') | |
# Right column: File viewer and Image viewer | |
with gr.Column(scale=1): | |
with gr.Tab("File Viewer"): | |
file_viewer = gr.File(label="File Restore") | |
file_status = gr.Textbox(label="File Status", interactive=False) | |
file_size = gr.Textbox(label="File Size", interactive=False) | |
file_creation_time = gr.Textbox(label="File Creation Time", interactive=False) | |
with gr.Row(): | |
new_file_name = gr.Textbox(label="New File Name", placeholder="Enter new file name") | |
rename_button = gr.Button("Rename File") | |
with gr.Tab("Image Viewer"): | |
image_viewer = gr.Image(label="Image Viewer", type="filepath") | |
# Event handlers | |
refresh_button.click(fn=refresh_files, outputs=uploaded_files_list) | |
delete_button.click(fn=delete_file, outputs=[uploaded_files_list, file_viewer, file_status, file_size, file_creation_time]) | |
uploaded_files_list.select(fn=display_file, inputs=uploaded_files_list, outputs=[file_viewer, image_viewer, file_status]) | |
uploaded_files_list.select(fn=on_select, outputs=[file_viewer, file_status, file_size, file_creation_time]) | |
rename_button.click(fn=rename_file, | |
inputs=new_file_name, | |
outputs=[uploaded_files_list, file_status, file_viewer, file_size, file_creation_time]) | |
render_button = gr.Button("Render all PDFs to Database") | |
render_button.click(fn=render_to_database) | |
with gr.TabItem('Settings'): | |
with gr.TabItem('Database'): | |
gr.Markdown('Settings') | |
test_nylon = gr.Textbox(label='Test Nylon', placeholder='Query') | |
uploaded_files_list2 = gr.DataFrame(headers=["Nylon Returned Query"], datatype="str", interactive=False) | |
query_button = gr.Button('Query') | |
query_button.click(fn=query_database, inputs=test_nylon, outputs=uploaded_files_list2) | |
with gr.TabItem('Theme'): | |
gr.Markdown('Change Theme') | |
theme_dropdown = gr.Dropdown(choices=['default', 'compact', 'huggingface', 'soft', 'dark'], label='Choose Theme') | |
theme_button = gr.Button('Apply Theme') | |
theme_button.click(fn=change_theme, inputs=theme_dropdown) | |
relaunch_button = gr.Button('Relaunch') | |
relaunch_button.click(fn=relaunch) | |
# Launch the interface | |
demo.launch(share=True) | |