import streamlit as st
import anthropic, openai, base64, cv2, glob, json, math, os, pytz, random, re, requests, textract, time, zipfile
import streamlit.components.v1 as components
from datetime import datetime
from bs4 import BeautifulSoup
from collections import defaultdict
from dotenv import load_dotenv
from gradio_client import Client
from huggingface_hub import InferenceClient
from io import BytesIO
from PIL import Image
from PyPDF2 import PdfReader
from urllib.parse import quote
import asyncio
import edge_tts
import io
import sys
import subprocess
# π§Ή Clean up the environment and load keys
st.set_page_config(
page_title="π²BikeAIπ Claude/GPT Research",
page_icon="π²π",
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': 'https://huggingface.co/awacke1',
'Report a bug': 'https://huggingface.co/spaces/awacke1',
'About': "π²BikeAIπ Claude/GPT Research AI"
}
)
load_dotenv()
openai_api_key = os.getenv('OPENAI_API_KEY', "")
anthropic_key = os.getenv('ANTHROPIC_API_KEY', "")
if 'OPENAI_API_KEY' in st.secrets: openai_api_key = st.secrets['OPENAI_API_KEY']
if 'ANTHROPIC_API_KEY' in st.secrets: anthropic_key = st.secrets["ANTHROPIC_API_KEY"]
openai.api_key = openai_api_key
HF_KEY = os.getenv('HF_KEY')
API_URL = os.getenv('API_URL')
claude_client = anthropic.Anthropic(api_key=anthropic_key)
# For GPT-4o calls
openai_client = openai.ChatCompletion
# π§ Session State
for var in ['transcript_history','chat_history','openai_model','messages','last_voice_input',
'editing_file','edit_new_name','edit_new_content','viewing_prefix','should_rerun',
'old_val']:
if var not in st.session_state:
st.session_state[var] = [] if var.endswith('history') else None if var.startswith('view') else ""
if not st.session_state.openai_model:
st.session_state.openai_model = "gpt-4-0613" # Update to a stable GPT-4 model if needed
# π¨ Custom CSS
st.markdown("""
""", unsafe_allow_html=True)
# π·οΈ Helper for extracting high-info terms
def get_high_info_terms(text: str) -> list:
stop_words = set([
'the','a','an','and','or','but','in','on','at','to','for','of','with','by','from','up','about','into','over','after','is','are','was','were','be','been','being','have','has','had','do','does','did','will','would','should','could','might','must','shall','can','may','this','that','these','those','i','you','he','she','it','we','they','what','which','who','when','where','why','how','all','any','both','each','few','more','most','other','some','such','than','too','very','just','there'
])
text_lower = text.lower()
words = re.findall(r'\b\w+(?:-\w+)*\b', text_lower)
meaningful = [w for w in words if len(w)>3 and w not in stop_words and any(c.isalpha() for c in w)]
# Deduplicate while preserving order
seen = set()
uniq = [w for w in meaningful if not (w in seen or seen.add(w))]
return uniq[:5]
# π± Improved filename generation includes prompt & response terms
def generate_filename(prompt, response, file_type="md"):
# Combine prompt & response for naming. The prompt terms have priority.
prompt_terms = get_high_info_terms(prompt)
response_terms = get_high_info_terms(response)
combined_terms = prompt_terms + [t for t in response_terms if t not in prompt_terms]
name_text = '_'.join(t.replace(' ','-') for t in combined_terms) or 'file'
# Limit length
name_text = name_text[:100]
prefix = datetime.now().strftime("%y%m_%H%M_")
return f"{prefix}{name_text}.{file_type}"
# π£οΈ Clean text for speech
def clean_for_speech(text: str) -> str:
text = text.replace("\n", " ").replace("", " ").replace("#","")
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text)
return re.sub(r"\s+", " ", text).strip()
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural", rate=0, pitch=0):
text = clean_for_speech(text)
if not text.strip(): return None
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
com = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str)
out_fn = generate_filename(text, text, "mp3")
await com.save(out_fn)
return out_fn
def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0):
return asyncio.run(edge_tts_generate_audio(text, voice, rate, pitch))
def play_and_download_audio(file_path):
if file_path and os.path.exists(file_path):
st.audio(file_path)
enc = base64.b64encode(open(file_path,"rb").read()).decode()
st.markdown(f'Download {os.path.basename(file_path)}', unsafe_allow_html=True)
# π§° Code execution environment
context = {}
# π οΈ Executes Python code blocks safely, returning combined output
def execute_python_blocks(response):
combined = ""
code_blocks = re.findall(r'```(?:python\s*)?([\s\S]*?)```', response, re.IGNORECASE)
for code_block in code_blocks:
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
exec(code_block, context)
code_output = sys.stdout.getvalue()
combined += f"# Code Execution Output\n```\n{code_output}\n```\n\n"
st.code(code_output)
except Exception as e:
combined += f"# Execution Error\n```python\n{e}\n```\n\n"
finally:
sys.stdout = old_stdout
return combined
# ποΈ Creates & saves a file with prompt/response and executed code results
def create_file(filename, prompt, response):
base, ext = os.path.splitext(filename)
content = f"# Prompt π\n{prompt}\n\n# Response π¬\n{response}\n\n"
# Execute code in response
exec_results = execute_python_blocks(response)
content += exec_results
# Save
with open(f"{base}.md", 'w', encoding='utf-8') as file:
file.write(content)
# Download link
with open(f"{base}.md", 'rb') as file:
encoded = base64.b64encode(file.read()).decode()
href = f'Download File π'
st.markdown(href, unsafe_allow_html=True)
# π¨ Unified AI call helper
def call_model(model_type, text):
# model_type: "Arxiv", "GPT-4", or "Claude"
# Returns (answer, filename)
if model_type == "Claude":
try:
r = claude_client.completions.create(
prompt=anthropic.HUMAN_PROMPT + text + anthropic.AI_PROMPT,
model="claude-instant-1",
max_tokens_to_sample=1000
)
ans = r.completion.strip()
except Exception as e:
ans = f"Error calling Claude: {e}"
elif model_type == "Arxiv":
# ArXiv RAG flow
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern")
refs = client.predict(text,20,"Semantic Search","mistralai/Mixtral-8x7B-Instruct-v0.1",api_name="/update_with_rag_md")[0]
r2 = client.predict(text,"mistralai/Mixtral-8x7B-Instruct-v0.1",True,api_name="/ask_llm")
ans = f"### π {text}\n\n{r2}\n\n{refs}"
else:
# GPT-4o call
try:
c = openai_client.create(
model=st.session_state.openai_model,
messages=[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":text}]
)
ans = c.choices[0].message.content.strip()
except Exception as e:
ans = f"Error calling GPT-4: {e}"
filename = generate_filename(text, ans, "md")
create_file(filename, text, ans)
return ans
# πΆ Audio response options
def handle_audio_generation(q, ans, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False):
# This is used only for ArXiv results
if not ans.startswith("### π"):
return
# Extract sections for audio generation
# The main short answer is r2: We'll approximate by splitting at double-newline.
parts = ans.split("\n\n")
if len(parts)>2:
short_answer = parts[1]
refs = parts[-1]
else:
short_answer = ans
refs = ""
if full_audio:
complete_text = f"Complete response for query: {q}. {clean_for_speech(short_answer)} {clean_for_speech(refs)}"
f_all = speak_with_edge_tts(complete_text)
st.write("### π Complete Audio Response")
play_and_download_audio(f_all)
if vocal_summary:
main_audio = speak_with_edge_tts(short_answer)
st.write("### ποΈ Vocal Summary")
play_and_download_audio(main_audio)
if extended_refs and refs.strip():
ref_audio = speak_with_edge_tts("Here are the extended references: " + refs)
st.write("### π Extended References & Summaries")
play_and_download_audio(ref_audio)
if titles_summary and refs.strip():
titles = [m.group(1) for line in refs.split('\n') for m in [re.search(r"\[([^\]]+)\]", line)] if m]
if titles:
titles_text = "Here are the titles of the papers: " + ", ".join(titles)
t_audio = speak_with_edge_tts(titles_text)
st.write("### π Paper Titles")
play_and_download_audio(t_audio)
# π¦ File Management
def create_zip_of_files():
md_files = glob.glob("*.md")
mp3_files = glob.glob("*.mp3")
if not (md_files or mp3_files): return None
all_files = md_files+mp3_files
# Derive name from their content
text_combined = ""
for f in all_files:
if f.endswith(".md"):
text_combined += open(f,'r',encoding='utf-8').read()
terms = get_high_info_terms(text_combined)
name_text = '_'.join(terms[:3]) or 'archive'
zip_name = f"{datetime.now().strftime('%y%m_%H%M')}_{name_text}.zip"
with zipfile.ZipFile(zip_name,'w') as z:
for f in all_files:
z.write(f)
return zip_name
def load_files_for_sidebar():
md = glob.glob("*.md")
mp3 = glob.glob("*.mp3")
allf = md+mp3
groups = defaultdict(list)
for f in allf:
prefix = os.path.basename(f)[:10]
groups[prefix].append(f)
sorted_pref = sorted(groups.keys(), key=lambda pre: max(os.path.getmtime(x) for x in groups[pre]), reverse=True)
return groups, sorted_pref
def display_file_manager_sidebar():
st.sidebar.title("π΅ Audio & Document Manager")
groups, sorted_pref = load_files_for_sidebar()
all_md = [f for f in glob.glob("*.md") if os.path.basename(f).lower()!='readme.md']
all_mp3 = glob.glob("*.mp3")
top_bar = st.sidebar.columns(3)
with top_bar[0]:
if st.button("π Del All MD"):
for f in all_md: os.remove(f)
st.session_state.should_rerun = True
with top_bar[1]:
if st.button("π Del All MP3"):
for f in all_mp3: os.remove(f)
st.session_state.should_rerun = True
with top_bar[2]:
if st.button("β¬οΈ Zip All"):
z = create_zip_of_files()
if z:
with open(z,"rb") as f:
b64 = base64.b64encode(f.read()).decode()
st.sidebar.markdown(f'π Download {os.path.basename(z)}', unsafe_allow_html=True)
for prefix in sorted_pref:
files = groups[prefix]
# Extract simple keywords
txt = ""
for f in files:
if f.endswith(".md"):
txt+=open(f,'r',encoding='utf-8').read()+" "
kw = get_high_info_terms(txt)
kw_str = " ".join(kw) if kw else "No Keywords"
with st.sidebar.expander(f"{prefix} Files ({len(files)}) - {kw_str}", expanded=True):
c1,c2 = st.columns(2)
with c1:
if st.button("πView Group", key="view_"+prefix):
st.session_state.viewing_prefix = prefix
with c2:
if st.button("πDel Group", key="del_"+prefix):
for f in files: os.remove(f)
st.success(f"Deleted group {prefix}")
st.session_state.should_rerun = True
for f in files:
ctime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S")
st.write(f"**{os.path.basename(f)}** - {ctime}")
# Viewing group
if st.session_state.viewing_prefix and st.session_state.viewing_prefix in groups:
st.write("---")
st.write(f"**Viewing Group:** {st.session_state.viewing_prefix}")
for f in groups[st.session_state.viewing_prefix]:
ext = f.split('.')[-1].lower()
st.write(f"### {os.path.basename(f)}")
if ext == "md":
c = open(f,'r',encoding='utf-8').read()
st.markdown(c)
elif ext == "mp3":
st.audio(f)
else:
with open(f,"rb") as fil:
enc = base64.b64encode(fil.read()).decode()
st.markdown(f'Download {os.path.basename(f)}', unsafe_allow_html=True)
if st.button("Close Group View"):
st.session_state.viewing_prefix = None
def main():
st.sidebar.markdown("### π²BikeAIπ Multi-Agent Research AI")
tab_main = st.radio("Action:",["π€ Voice Input","π Search ArXiv","π File Editor"],horizontal=True)
# A small custom component hook (if used)
mycomponent = components.declare_component("mycomponent", path="mycomponent")
val = mycomponent(my_input_value="Hello")
# If we have component input, show controls
if val:
edited_input = st.text_area("Edit your detected input:", value=val.strip().replace('\n',' '), height=100)
run_option = st.selectbox("Select AI Model:", ["Arxiv", "GPT-4", "Claude"])
col1, col2 = st.columns(2)
with col1:
autorun = st.checkbox("AutoRun on input change", value=False)
with col2:
full_audio = st.checkbox("Generate Complete Audio", value=False)
input_changed = (val != st.session_state.old_val)
if (autorun and input_changed) or st.button("Process Input"):
st.session_state.old_val = val
ans = call_model("Arxiv" if run_option=="Arxiv" else ("Claude" if run_option=="Claude" else "GPT-4"), edited_input)
if run_option=="Arxiv":
# Audio generation for Arxiv
handle_audio_generation(edited_input, ans, True, False, True, full_audio)
if tab_main == "π Search ArXiv":
q = st.text_input("Research query:")
st.markdown("### ποΈ Audio Generation Options")
vocal_summary = st.checkbox("Short Answer Audio", True)
extended_refs = st.checkbox("Extended References Audio", False)
titles_summary = st.checkbox("Paper Titles Audio", True)
full_audio = st.checkbox("Full Audio Response", False)
if q and st.button("Run ArXiv Query"):
ans = call_model("Arxiv", q)
handle_audio_generation(q, ans, vocal_summary, extended_refs, titles_summary, full_audio)
elif tab_main == "π€ Voice Input":
user_text = st.text_area("Message:", height=100)
user_text = user_text.strip()
if st.button("Send π¨"):
ans = call_model("GPT-4", user_text)
st.session_state.messages.append({"role":"user","content":user_text})
st.session_state.messages.append({"role":"assistant","content":ans})
st.subheader("π Chat History")
for m in st.session_state.messages:
with st.chat_message(m["role"]):
st.markdown(m["content"])
elif tab_main == "π File Editor":
# For simplicity, user selects from sidebar
st.write("Select a file from the sidebar to edit.")
# Display File Manager
display_file_manager_sidebar()
if st.session_state.should_rerun:
st.session_state.should_rerun = False
st.rerun()
if __name__=="__main__":
main()