Spaces:
Running
Running
import streamlit as st | |
import anthropic, openai, base64, cv2, glob, json, math, os, pytz, random, re, requests, textract, time, zipfile | |
import streamlit.components.v1 as components | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
from collections import defaultdict | |
from dotenv import load_dotenv | |
from gradio_client import Client | |
from huggingface_hub import InferenceClient | |
from io import BytesIO | |
from PIL import Image | |
from PyPDF2 import PdfReader | |
from urllib.parse import quote | |
import asyncio | |
import edge_tts | |
import io | |
import sys | |
import subprocess | |
# π§Ή Clean up the environment and load keys | |
st.set_page_config( | |
page_title="π²BikeAIπ Claude/GPT Research", | |
page_icon="π²π", | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': 'https://huggingface.co/awacke1', | |
'Report a bug': 'https://huggingface.co/spaces/awacke1', | |
'About': "π²BikeAIπ Claude/GPT Research AI" | |
} | |
) | |
load_dotenv() | |
openai_api_key = os.getenv('OPENAI_API_KEY', "") | |
anthropic_key = os.getenv('ANTHROPIC_API_KEY', "") | |
if 'OPENAI_API_KEY' in st.secrets: openai_api_key = st.secrets['OPENAI_API_KEY'] | |
if 'ANTHROPIC_API_KEY' in st.secrets: anthropic_key = st.secrets["ANTHROPIC_API_KEY"] | |
openai.api_key = openai_api_key | |
HF_KEY = os.getenv('HF_KEY') | |
API_URL = os.getenv('API_URL') | |
claude_client = anthropic.Anthropic(api_key=anthropic_key) | |
# For GPT-4o calls | |
openai_client = openai.ChatCompletion | |
# π§ Session State | |
for var in ['transcript_history','chat_history','openai_model','messages','last_voice_input', | |
'editing_file','edit_new_name','edit_new_content','viewing_prefix','should_rerun', | |
'old_val']: | |
if var not in st.session_state: | |
st.session_state[var] = [] if var.endswith('history') else None if var.startswith('view') else "" | |
if not st.session_state.openai_model: | |
st.session_state.openai_model = "gpt-4-0613" # Update to a stable GPT-4 model if needed | |
# π¨ Custom CSS | |
st.markdown(""" | |
<style> | |
.main { background: linear-gradient(to right, #1a1a1a, #2d2d2d); color: #fff; } | |
.stMarkdown { font-family: 'Helvetica Neue', sans-serif; } | |
.stButton>button { margin-right: 0.5rem; } | |
</style> | |
""", unsafe_allow_html=True) | |
# π·οΈ Helper for extracting high-info terms | |
def get_high_info_terms(text: str) -> list: | |
stop_words = set([ | |
'the','a','an','and','or','but','in','on','at','to','for','of','with','by','from','up','about','into','over','after','is','are','was','were','be','been','being','have','has','had','do','does','did','will','would','should','could','might','must','shall','can','may','this','that','these','those','i','you','he','she','it','we','they','what','which','who','when','where','why','how','all','any','both','each','few','more','most','other','some','such','than','too','very','just','there' | |
]) | |
text_lower = text.lower() | |
words = re.findall(r'\b\w+(?:-\w+)*\b', text_lower) | |
meaningful = [w for w in words if len(w)>3 and w not in stop_words and any(c.isalpha() for c in w)] | |
# Deduplicate while preserving order | |
seen = set() | |
uniq = [w for w in meaningful if not (w in seen or seen.add(w))] | |
return uniq[:5] | |
# π± Improved filename generation includes prompt & response terms | |
def generate_filename(prompt, response, file_type="md"): | |
# Combine prompt & response for naming. The prompt terms have priority. | |
prompt_terms = get_high_info_terms(prompt) | |
response_terms = get_high_info_terms(response) | |
combined_terms = prompt_terms + [t for t in response_terms if t not in prompt_terms] | |
name_text = '_'.join(t.replace(' ','-') for t in combined_terms) or 'file' | |
# Limit length | |
name_text = name_text[:100] | |
prefix = datetime.now().strftime("%y%m_%H%M_") | |
return f"{prefix}{name_text}.{file_type}" | |
# π£οΈ Clean text for speech | |
def clean_for_speech(text: str) -> str: | |
text = text.replace("\n", " ").replace("</s>", " ").replace("#","") | |
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) | |
return re.sub(r"\s+", " ", text).strip() | |
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural", rate=0, pitch=0): | |
text = clean_for_speech(text) | |
if not text.strip(): return None | |
rate_str = f"{rate:+d}%" | |
pitch_str = f"{pitch:+d}Hz" | |
com = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str) | |
out_fn = generate_filename(text, text, "mp3") | |
await com.save(out_fn) | |
return out_fn | |
def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0): | |
return asyncio.run(edge_tts_generate_audio(text, voice, rate, pitch)) | |
def play_and_download_audio(file_path): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
enc = base64.b64encode(open(file_path,"rb").read()).decode() | |
st.markdown(f'<a href="data:audio/mpeg;base64,{enc}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>', unsafe_allow_html=True) | |
# π§° Code execution environment | |
context = {} | |
# π οΈ Executes Python code blocks safely, returning combined output | |
def execute_python_blocks(response): | |
combined = "" | |
code_blocks = re.findall(r'```(?:python\s*)?([\s\S]*?)```', response, re.IGNORECASE) | |
for code_block in code_blocks: | |
old_stdout = sys.stdout | |
sys.stdout = io.StringIO() | |
try: | |
exec(code_block, context) | |
code_output = sys.stdout.getvalue() | |
combined += f"# Code Execution Output\n```\n{code_output}\n```\n\n" | |
st.code(code_output) | |
except Exception as e: | |
combined += f"# Execution Error\n```python\n{e}\n```\n\n" | |
finally: | |
sys.stdout = old_stdout | |
return combined | |
# ποΈ Creates & saves a file with prompt/response and executed code results | |
def create_file(filename, prompt, response): | |
base, ext = os.path.splitext(filename) | |
content = f"# Prompt π\n{prompt}\n\n# Response π¬\n{response}\n\n" | |
# Execute code in response | |
exec_results = execute_python_blocks(response) | |
content += exec_results | |
# Save | |
with open(f"{base}.md", 'w', encoding='utf-8') as file: | |
file.write(content) | |
# Download link | |
with open(f"{base}.md", 'rb') as file: | |
encoded = base64.b64encode(file.read()).decode() | |
href = f'<a href="data:file/markdown;base64,{encoded}" download="{filename}">Download File π</a>' | |
st.markdown(href, unsafe_allow_html=True) | |
# π¨ Unified AI call helper | |
def call_model(model_type, text): | |
# model_type: "Arxiv", "GPT-4", or "Claude" | |
# Returns (answer, filename) | |
if model_type == "Claude": | |
try: | |
r = claude_client.completions.create( | |
prompt=anthropic.HUMAN_PROMPT + text + anthropic.AI_PROMPT, | |
model="claude-instant-1", | |
max_tokens_to_sample=1000 | |
) | |
ans = r.completion.strip() | |
except Exception as e: | |
ans = f"Error calling Claude: {e}" | |
elif model_type == "Arxiv": | |
# ArXiv RAG flow | |
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
refs = client.predict(text,20,"Semantic Search","mistralai/Mixtral-8x7B-Instruct-v0.1",api_name="/update_with_rag_md")[0] | |
r2 = client.predict(text,"mistralai/Mixtral-8x7B-Instruct-v0.1",True,api_name="/ask_llm") | |
ans = f"### π {text}\n\n{r2}\n\n{refs}" | |
else: | |
# GPT-4o call | |
try: | |
c = openai_client.create( | |
model=st.session_state.openai_model, | |
messages=[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":text}] | |
) | |
ans = c.choices[0].message.content.strip() | |
except Exception as e: | |
ans = f"Error calling GPT-4: {e}" | |
filename = generate_filename(text, ans, "md") | |
create_file(filename, text, ans) | |
return ans | |
# πΆ Audio response options | |
def handle_audio_generation(q, ans, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False): | |
# This is used only for ArXiv results | |
if not ans.startswith("### π"): | |
return | |
# Extract sections for audio generation | |
# The main short answer is r2: We'll approximate by splitting at double-newline. | |
parts = ans.split("\n\n") | |
if len(parts)>2: | |
short_answer = parts[1] | |
refs = parts[-1] | |
else: | |
short_answer = ans | |
refs = "" | |
if full_audio: | |
complete_text = f"Complete response for query: {q}. {clean_for_speech(short_answer)} {clean_for_speech(refs)}" | |
f_all = speak_with_edge_tts(complete_text) | |
st.write("### π Complete Audio Response") | |
play_and_download_audio(f_all) | |
if vocal_summary: | |
main_audio = speak_with_edge_tts(short_answer) | |
st.write("### ποΈ Vocal Summary") | |
play_and_download_audio(main_audio) | |
if extended_refs and refs.strip(): | |
ref_audio = speak_with_edge_tts("Here are the extended references: " + refs) | |
st.write("### π Extended References & Summaries") | |
play_and_download_audio(ref_audio) | |
if titles_summary and refs.strip(): | |
titles = [m.group(1) for line in refs.split('\n') for m in [re.search(r"\[([^\]]+)\]", line)] if m] | |
if titles: | |
titles_text = "Here are the titles of the papers: " + ", ".join(titles) | |
t_audio = speak_with_edge_tts(titles_text) | |
st.write("### π Paper Titles") | |
play_and_download_audio(t_audio) | |
# π¦ File Management | |
def create_zip_of_files(): | |
md_files = glob.glob("*.md") | |
mp3_files = glob.glob("*.mp3") | |
if not (md_files or mp3_files): return None | |
all_files = md_files+mp3_files | |
# Derive name from their content | |
text_combined = "" | |
for f in all_files: | |
if f.endswith(".md"): | |
text_combined += open(f,'r',encoding='utf-8').read() | |
terms = get_high_info_terms(text_combined) | |
name_text = '_'.join(terms[:3]) or 'archive' | |
zip_name = f"{datetime.now().strftime('%y%m_%H%M')}_{name_text}.zip" | |
with zipfile.ZipFile(zip_name,'w') as z: | |
for f in all_files: | |
z.write(f) | |
return zip_name | |
def load_files_for_sidebar(): | |
md = glob.glob("*.md") | |
mp3 = glob.glob("*.mp3") | |
allf = md+mp3 | |
groups = defaultdict(list) | |
for f in allf: | |
prefix = os.path.basename(f)[:10] | |
groups[prefix].append(f) | |
sorted_pref = sorted(groups.keys(), key=lambda pre: max(os.path.getmtime(x) for x in groups[pre]), reverse=True) | |
return groups, sorted_pref | |
def display_file_manager_sidebar(): | |
st.sidebar.title("π΅ Audio & Document Manager") | |
groups, sorted_pref = load_files_for_sidebar() | |
all_md = [f for f in glob.glob("*.md") if os.path.basename(f).lower()!='readme.md'] | |
all_mp3 = glob.glob("*.mp3") | |
top_bar = st.sidebar.columns(3) | |
with top_bar[0]: | |
if st.button("π Del All MD"): | |
for f in all_md: os.remove(f) | |
st.session_state.should_rerun = True | |
with top_bar[1]: | |
if st.button("π Del All MP3"): | |
for f in all_mp3: os.remove(f) | |
st.session_state.should_rerun = True | |
with top_bar[2]: | |
if st.button("β¬οΈ Zip All"): | |
z = create_zip_of_files() | |
if z: | |
with open(z,"rb") as f: | |
b64 = base64.b64encode(f.read()).decode() | |
st.sidebar.markdown(f'<a href="data:file/zip;base64,{b64}" download="{os.path.basename(z)}">π Download {os.path.basename(z)}</a>', unsafe_allow_html=True) | |
for prefix in sorted_pref: | |
files = groups[prefix] | |
# Extract simple keywords | |
txt = "" | |
for f in files: | |
if f.endswith(".md"): | |
txt+=open(f,'r',encoding='utf-8').read()+" " | |
kw = get_high_info_terms(txt) | |
kw_str = " ".join(kw) if kw else "No Keywords" | |
with st.sidebar.expander(f"{prefix} Files ({len(files)}) - {kw_str}", expanded=True): | |
c1,c2 = st.columns(2) | |
with c1: | |
if st.button("πView Group", key="view_"+prefix): | |
st.session_state.viewing_prefix = prefix | |
with c2: | |
if st.button("πDel Group", key="del_"+prefix): | |
for f in files: os.remove(f) | |
st.success(f"Deleted group {prefix}") | |
st.session_state.should_rerun = True | |
for f in files: | |
ctime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S") | |
st.write(f"**{os.path.basename(f)}** - {ctime}") | |
# Viewing group | |
if st.session_state.viewing_prefix and st.session_state.viewing_prefix in groups: | |
st.write("---") | |
st.write(f"**Viewing Group:** {st.session_state.viewing_prefix}") | |
for f in groups[st.session_state.viewing_prefix]: | |
ext = f.split('.')[-1].lower() | |
st.write(f"### {os.path.basename(f)}") | |
if ext == "md": | |
c = open(f,'r',encoding='utf-8').read() | |
st.markdown(c) | |
elif ext == "mp3": | |
st.audio(f) | |
else: | |
with open(f,"rb") as fil: | |
enc = base64.b64encode(fil.read()).decode() | |
st.markdown(f'<a href="data:file/{ext};base64,{enc}" download="{os.path.basename(f)}">Download {os.path.basename(f)}</a>', unsafe_allow_html=True) | |
if st.button("Close Group View"): | |
st.session_state.viewing_prefix = None | |
def main(): | |
st.sidebar.markdown("### π²BikeAIπ Multi-Agent Research AI") | |
tab_main = st.radio("Action:",["π€ Voice Input","π Search ArXiv","π File Editor"],horizontal=True) | |
# A small custom component hook (if used) | |
mycomponent = components.declare_component("mycomponent", path="mycomponent") | |
val = mycomponent(my_input_value="Hello") | |
# If we have component input, show controls | |
if val: | |
edited_input = st.text_area("Edit your detected input:", value=val.strip().replace('\n',' '), height=100) | |
run_option = st.selectbox("Select AI Model:", ["Arxiv", "GPT-4", "Claude"]) | |
col1, col2 = st.columns(2) | |
with col1: | |
autorun = st.checkbox("AutoRun on input change", value=False) | |
with col2: | |
full_audio = st.checkbox("Generate Complete Audio", value=False) | |
input_changed = (val != st.session_state.old_val) | |
if (autorun and input_changed) or st.button("Process Input"): | |
st.session_state.old_val = val | |
ans = call_model("Arxiv" if run_option=="Arxiv" else ("Claude" if run_option=="Claude" else "GPT-4"), edited_input) | |
if run_option=="Arxiv": | |
# Audio generation for Arxiv | |
handle_audio_generation(edited_input, ans, True, False, True, full_audio) | |
if tab_main == "π Search ArXiv": | |
q = st.text_input("Research query:") | |
st.markdown("### ποΈ Audio Generation Options") | |
vocal_summary = st.checkbox("Short Answer Audio", True) | |
extended_refs = st.checkbox("Extended References Audio", False) | |
titles_summary = st.checkbox("Paper Titles Audio", True) | |
full_audio = st.checkbox("Full Audio Response", False) | |
if q and st.button("Run ArXiv Query"): | |
ans = call_model("Arxiv", q) | |
handle_audio_generation(q, ans, vocal_summary, extended_refs, titles_summary, full_audio) | |
elif tab_main == "π€ Voice Input": | |
user_text = st.text_area("Message:", height=100) | |
user_text = user_text.strip() | |
if st.button("Send π¨"): | |
ans = call_model("GPT-4", user_text) | |
st.session_state.messages.append({"role":"user","content":user_text}) | |
st.session_state.messages.append({"role":"assistant","content":ans}) | |
st.subheader("π Chat History") | |
for m in st.session_state.messages: | |
with st.chat_message(m["role"]): | |
st.markdown(m["content"]) | |
elif tab_main == "π File Editor": | |
# For simplicity, user selects from sidebar | |
st.write("Select a file from the sidebar to edit.") | |
# Display File Manager | |
display_file_manager_sidebar() | |
if st.session_state.should_rerun: | |
st.session_state.should_rerun = False | |
st.rerun() | |
if __name__=="__main__": | |
main() | |