awacke1's picture
Update app.py
68ace7c verified
raw
history blame
22.9 kB
import streamlit as st
import anthropic, openai, base64, cv2, glob, json, math, os, pytz, random, re, requests, textract, time, zipfile
import plotly.graph_objects as go
import streamlit.components.v1 as components
from datetime import datetime
from audio_recorder_streamlit import audio_recorder
from bs4 import BeautifulSoup
from collections import defaultdict, deque
from dotenv import load_dotenv
from gradio_client import Client
from huggingface_hub import InferenceClient
from io import BytesIO
from PIL import Image
from PyPDF2 import PdfReader
from urllib.parse import quote
from xml.etree import ElementTree as ET
from openai import OpenAI
import extra_streamlit_components as stx
from streamlit.runtime.scriptrunner import get_script_run_ctx
import asyncio
import edge_tts
# πŸ”§ Config & Setup
st.set_page_config(
page_title="🚲BikeAIπŸ† Claude/GPT Research",
page_icon="πŸš²πŸ†",
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': 'https://huggingface.co/awacke1',
'Report a bug': 'https://huggingface.co/spaces/awacke1',
'About': "🚲BikeAIπŸ† Claude/GPT Research AI"
}
)
load_dotenv()
openai_api_key = os.getenv('OPENAI_API_KEY', "")
anthropic_key = os.getenv('ANTHROPIC_API_KEY_3', "")
if 'OPENAI_API_KEY' in st.secrets:
openai_api_key = st.secrets['OPENAI_API_KEY']
if 'ANTHROPIC_API_KEY' in st.secrets:
anthropic_key = st.secrets["ANTHROPIC_API_KEY"]
openai.api_key = openai_api_key
claude_client = anthropic.Anthropic(api_key=anthropic_key)
openai_client = OpenAI(api_key=openai.api_key, organization=os.getenv('OPENAI_ORG_ID'))
HF_KEY = os.getenv('HF_KEY')
API_URL = os.getenv('API_URL')
if 'transcript_history' not in st.session_state:
st.session_state['transcript_history'] = []
if 'chat_history' not in st.session_state:
st.session_state['chat_history'] = []
if 'openai_model' not in st.session_state:
st.session_state['openai_model'] = "gpt-4o-2024-05-13"
if 'messages' not in st.session_state:
st.session_state['messages'] = []
if 'last_voice_input' not in st.session_state:
st.session_state['last_voice_input'] = ""
if 'editing_file' not in st.session_state:
st.session_state['editing_file'] = None
if 'edit_new_name' not in st.session_state:
st.session_state['edit_new_name'] = ""
if 'edit_new_content' not in st.session_state:
st.session_state['edit_new_content'] = ""
if 'viewing_prefix' not in st.session_state:
st.session_state['viewing_prefix'] = None
if 'should_rerun' not in st.session_state:
st.session_state['should_rerun'] = False
# 🎨 Minimal Custom CSS
st.markdown("""
<style>
.main { background: linear-gradient(to right, #1a1a1a, #2d2d2d); color: #fff; }
.stMarkdown { font-family: 'Helvetica Neue', sans-serif; }
.stButton>button {
margin-right: 0.5rem;
}
</style>
""", unsafe_allow_html=True)
FILE_EMOJIS = {
"md": "πŸ“",
"mp3": "🎡",
}
def clean_for_speech(text: str) -> str:
text = text.replace("\n", " ")
text = text.replace("</s>", " ")
text = text.replace("#", "")
# Remove links like (https://...)
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def generate_filename(content, file_type="md"):
# Prefix: YYMM_HHmm_ -> total 10 chars including underscore
# Actually: %y%m_%H%M gives 9 chars, add trailing underscore for total 10 chars.
# Example: 23 09 _12 45 _ => '2309_1245_'
prefix = datetime.now().strftime("%y%m_%H%M") + "_"
# Extract some words from content
words = re.findall(r"\w+", content)
# Take first 3 words for filename segment
name_text = '_'.join(words[:3]) if words else 'file'
filename = f"{prefix}{name_text}.{file_type}"
return filename
def create_file(prompt, response, file_type="md"):
# Decide which content to base the filename on (prefer response)
base_content = response.strip() if response.strip() else prompt.strip()
filename = generate_filename(base_content, file_type)
with open(filename, 'w', encoding='utf-8') as f:
f.write(prompt + "\n\n" + response)
return filename
def get_download_link(file):
with open(file, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
return f'<a href="data:file/zip;base64,{b64}" download="{os.path.basename(file)}">πŸ“‚ Download {os.path.basename(file)}</a>'
@st.cache_resource
def speech_synthesis_html(result):
html_code = f"""
<html><body>
<script>
var msg = new SpeechSynthesisUtterance("{result.replace('"', '')}");
window.speechSynthesis.speak(msg);
</script>
</body></html>
"""
components.html(html_code, height=0)
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural", rate=0, pitch=0):
text = clean_for_speech(text)
if not text.strip():
return None
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
communicate = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str)
out_fn = generate_filename(text,"mp3")
await communicate.save(out_fn)
return out_fn
def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0):
return asyncio.run(edge_tts_generate_audio(text, voice, rate, pitch))
def play_and_download_audio(file_path):
if file_path and os.path.exists(file_path):
st.audio(file_path)
dl_link = f'<a href="data:audio/mpeg;base64,{base64.b64encode(open(file_path,"rb").read()).decode()}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>'
st.markdown(dl_link, unsafe_allow_html=True)
def process_image(image_path, user_prompt):
with open(image_path, "rb") as imgf:
image_data = imgf.read()
b64img = base64.b64encode(image_data).decode("utf-8")
resp = openai_client.chat.completions.create(
model=st.session_state["openai_model"],
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": [
{"type": "text", "text": user_prompt},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64img}"}}
]}
],
temperature=0.0,
)
return resp.choices[0].message.content
def process_audio(audio_path):
with open(audio_path, "rb") as f:
transcription = openai_client.audio.transcriptions.create(model="whisper-1", file=f)
st.session_state.messages.append({"role": "user", "content": transcription.text})
return transcription.text
def process_video(video_path, seconds_per_frame=1):
vid = cv2.VideoCapture(video_path)
total = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vid.get(cv2.CAP_PROP_FPS)
skip = int(fps*seconds_per_frame)
frames_b64 = []
for i in range(0, total, skip):
vid.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = vid.read()
if not ret: break
_, buf = cv2.imencode(".jpg", frame)
frames_b64.append(base64.b64encode(buf).decode("utf-8"))
vid.release()
return frames_b64
def process_video_with_gpt(video_path, prompt):
frames = process_video(video_path)
resp = openai_client.chat.completions.create(
model=st.session_state["openai_model"],
messages=[
{"role":"system","content":"Analyze video frames."},
{"role":"user","content":[
{"type":"text","text":prompt},
*[{"type":"image_url","image_url":{"url":f"data:image/jpeg;base64,{fr}"}} for fr in frames]
]}
]
)
return resp.choices[0].message.content
def search_arxiv(query):
st.write("πŸ” Searching ArXiv...")
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern")
r1 = client.predict(prompt=query, llm_model_picked="mistralai/Mixtral-8x7B-Instruct-v0.1", stream_outputs=True, api_name="/ask_llm")
st.markdown("### Mistral-8x7B-Instruct-v0.1 Result")
st.markdown(r1)
r2 = client.predict(prompt=query, llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", stream_outputs=True, api_name="/ask_llm")
st.markdown("### Mistral-7B-Instruct-v0.2 Result")
st.markdown(r2)
return f"{r1}\n\n{r2}"
def perform_ai_lookup(q, vocal_summary=True, extended_refs=False, titles_summary=True):
start = time.time()
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern")
r = client.predict(q,20,"Semantic Search","mistralai/Mixtral-8x7B-Instruct-v0.1",api_name="/update_with_rag_md")
refs = r[0]
r2 = client.predict(q,"mistralai/Mixtral-8x7B-Instruct-v0.1",True,api_name="/ask_llm")
result = f"### πŸ”Ž {q}\n\n{r2}\n\n{refs}"
st.markdown(result)
# Clean for speech before TTS
if vocal_summary:
main_text = clean_for_speech(r2)
audio_file_main = speak_with_edge_tts(main_text)
st.write("### πŸŽ™οΈ Vocal Summary (Short Answer)")
play_and_download_audio(audio_file_main)
if extended_refs:
summaries_text = "Here are the summaries from the references: " + refs.replace('"','')
summaries_text = clean_for_speech(summaries_text)
audio_file_refs = speak_with_edge_tts(summaries_text)
st.write("### πŸ“œ Extended References & Summaries")
play_and_download_audio(audio_file_refs)
if titles_summary:
titles = []
for line in refs.split('\n'):
m = re.search(r"\[([^\]]+)\]", line)
if m:
titles.append(m.group(1))
if titles:
titles_text = "Here are the titles of the papers: " + ", ".join(titles)
titles_text = clean_for_speech(titles_text)
audio_file_titles = speak_with_edge_tts(titles_text)
st.write("### πŸ”– Paper Titles")
play_and_download_audio(audio_file_titles)
elapsed = time.time()-start
st.write(f"**Total Elapsed:** {elapsed:.2f} s")
# Create MD file from q and result
create_file(q, result, "md")
return result
def process_with_gpt(text):
if not text: return
st.session_state.messages.append({"role":"user","content":text})
with st.chat_message("user"):
st.markdown(text)
with st.chat_message("assistant"):
c = openai_client.chat.completions.create(
model=st.session_state["openai_model"],
messages=st.session_state.messages,
stream=False
)
ans = c.choices[0].message.content
st.write("GPT-4o: " + ans)
create_file(text, ans, "md")
st.session_state.messages.append({"role":"assistant","content":ans})
return ans
def process_with_claude(text):
if not text: return
with st.chat_message("user"):
st.markdown(text)
with st.chat_message("assistant"):
r = claude_client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1000,
messages=[{"role":"user","content":text}]
)
ans = r.content[0].text
st.write("Claude: " + ans)
create_file(text, ans, "md")
st.session_state.chat_history.append({"user":text,"claude":ans})
return ans
def create_zip_of_files(md_files, mp3_files):
# Exclude README.md
md_files = [f for f in md_files if os.path.basename(f).lower() != 'readme.md']
all_files = md_files + mp3_files
if not all_files:
return None
# Build a descriptive name
stems = [os.path.splitext(os.path.basename(f))[0] for f in all_files]
joined = "_".join(stems)
if len(joined) > 50:
joined = joined[:50] + "_etc"
zip_name = f"{joined}.zip"
with zipfile.ZipFile(zip_name,'w') as z:
for f in all_files:
z.write(f)
return zip_name
def load_files_for_sidebar():
# Gather files
md_files = glob.glob("*.md")
mp3_files = glob.glob("*.mp3")
# Exclude README.md
md_files = [f for f in md_files if os.path.basename(f).lower() != 'readme.md']
all_files = md_files + mp3_files
# Group by first 10 chars of filename
groups = defaultdict(list)
for f in all_files:
fname = os.path.basename(f)
prefix = fname[:10] # first 10 chars as group prefix
groups[prefix].append(f)
# Sort files in each group by mod time descending
for prefix in groups:
groups[prefix].sort(key=lambda x: os.path.getmtime(x), reverse=True)
# Sort prefixes by newest file time
sorted_prefixes = sorted(groups.keys(), key=lambda pre: max(os.path.getmtime(x) for x in groups[pre]), reverse=True)
return groups, sorted_prefixes
def extract_keywords_from_md(files):
# Combine all MD content
text = ""
for f in files:
if f.endswith(".md"):
c = open(f,'r',encoding='utf-8').read()
text += " " + c
# Extract first 5 unique words
words = re.findall(r"\w+", text.lower())
unique_words = []
for w in words:
if w not in unique_words:
unique_words.append(w)
if len(unique_words) == 5:
break
return unique_words
def display_file_manager_sidebar(groups, sorted_prefixes):
st.sidebar.title("🎡 Audio & Document Manager")
# Collect all md and mp3 files for zip operations
all_md = []
all_mp3 = []
for prefix in groups:
for f in groups[prefix]:
if f.endswith(".md"):
all_md.append(f)
elif f.endswith(".mp3"):
all_mp3.append(f)
top_bar = st.sidebar.columns(3)
with top_bar[0]:
if st.button("πŸ—‘ Del All MD"):
for f in all_md:
os.remove(f)
st.session_state.should_rerun = True
with top_bar[1]:
if st.button("πŸ—‘ Del All MP3"):
for f in all_mp3:
os.remove(f)
st.session_state.should_rerun = True
with top_bar[2]:
if st.button("⬇️ Zip All"):
z = create_zip_of_files(all_md, all_mp3)
if z:
st.sidebar.markdown(get_download_link(z),unsafe_allow_html=True)
for prefix in sorted_prefixes:
files = groups[prefix]
# Extract 5-word keywords from MD in this group
kw = extract_keywords_from_md(files)
keywords_str = " ".join(kw) if kw else "No Keywords"
with st.sidebar.expander(f"{prefix} Files ({len(files)}) - Keywords: {keywords_str}", expanded=True):
# Delete group / View group
c1,c2 = st.columns(2)
with c1:
if st.button("πŸ‘€View Group", key="view_group_"+prefix):
st.session_state.viewing_prefix = prefix
# No rerun needed, just state update
with c2:
if st.button("πŸ—‘Del Group", key="del_group_"+prefix):
for f in files:
os.remove(f)
st.session_state.should_rerun = True
for f in files:
fname = os.path.basename(f)
ctime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S")
ext = os.path.splitext(fname)[1].lower().strip('.')
st.write(f"**{fname}** - {ctime}")
# Individual file actions are less necessary if we have group actions
# But we can still provide them if desired.
# The user requested grouping primarily, but we can keep minimal file actions if needed.
# In instructions now, main focus is group view/delete.
# We'll omit individual file view/edit here since we have group view.
# If needed, re-add them similarly as before.
# For now, rely on "View Group" to see all files.
def main():
st.sidebar.markdown("### 🚲BikeAIπŸ† Multi-Agent Research AI")
tab_main = st.radio("Action:",["🎀 Voice Input","πŸ“Έ Media Gallery","πŸ” Search ArXiv","πŸ“ File Editor"],horizontal=True)
model_choice = st.sidebar.radio("AI Model:", ["Arxiv","GPT-4o","Claude-3","GPT+Claude+Arxiv"], index=0)
mycomponent = components.declare_component("mycomponent", path="mycomponent")
val = mycomponent(my_input_value="Hello")
if val:
user_input = val.strip()
if user_input:
if model_choice == "GPT-4o":
process_with_gpt(user_input)
elif model_choice == "Claude-3":
process_with_claude(user_input)
elif model_choice == "Arxiv":
st.subheader("Arxiv Only Results:")
perform_ai_lookup(user_input, vocal_summary=True, extended_refs=False, titles_summary=True)
else:
col1,col2,col3=st.columns(3)
with col1:
st.subheader("GPT-4o Omni:")
try:
process_with_gpt(user_input)
except:
st.write('GPT 4o error')
with col2:
st.subheader("Claude-3 Sonnet:")
try:
process_with_claude(user_input)
except:
st.write('Claude error')
with col3:
st.subheader("Arxiv + Mistral:")
try:
perform_ai_lookup(user_input, vocal_summary=True, extended_refs=False, titles_summary=True)
except:
st.write("Arxiv error")
if tab_main == "πŸ” Search ArXiv":
st.subheader("πŸ” Search ArXiv")
q=st.text_input("Research query:")
st.markdown("### πŸŽ›οΈ Audio Generation Options")
vocal_summary = st.checkbox("πŸŽ™οΈ Vocal Summary (Short Answer)", value=True)
extended_refs = st.checkbox("πŸ“œ Extended References & Summaries (Long)", value=False)
titles_summary = st.checkbox("πŸ”– Paper Titles Only", value=True)
if q and st.button("Run ArXiv Query"):
perform_ai_lookup(q, vocal_summary=vocal_summary, extended_refs=extended_refs, titles_summary=titles_summary)
elif tab_main == "🎀 Voice Input":
st.subheader("🎀 Voice Recognition")
user_text = st.text_area("Message:", height=100)
user_text = user_text.strip()
if st.button("Send πŸ“¨"):
if user_text:
if model_choice == "GPT-4o":
process_with_gpt(user_text)
elif model_choice == "Claude-3":
process_with_claude(user_text)
elif model_choice == "Arxiv":
st.subheader("Arxiv Only Results:")
perform_ai_lookup(user_text, vocal_summary=True, extended_refs=False, titles_summary=True)
else:
col1,col2,col3=st.columns(3)
with col1:
st.subheader("GPT-4o Omni:")
process_with_gpt(user_text)
with col2:
st.subheader("Claude-3 Sonnet:")
process_with_claude(user_text)
with col3:
st.subheader("Arxiv & Mistral:")
res = perform_ai_lookup(user_text, vocal_summary=True, extended_refs=False, titles_summary=True)
st.markdown(res)
st.subheader("πŸ“œ Chat History")
t1,t2=st.tabs(["Claude History","GPT-4o History"])
with t1:
for c in st.session_state.chat_history:
st.write("**You:**", c["user"])
st.write("**Claude:**", c["claude"])
with t2:
for m in st.session_state.messages:
with st.chat_message(m["role"]):
st.markdown(m["content"])
elif tab_main == "πŸ“Έ Media Gallery":
st.header("🎬 Media Gallery - Images and Videos")
tabs = st.tabs(["πŸ–ΌοΈ Images", "πŸŽ₯ Video"])
with tabs[0]:
imgs = glob.glob("*.png")+glob.glob("*.jpg")
if imgs:
c = st.slider("Cols",1,5,3)
cols = st.columns(c)
for i,f in enumerate(imgs):
with cols[i%c]:
st.image(Image.open(f),use_container_width=True)
if st.button(f"πŸ‘€ Analyze {os.path.basename(f)}", key=f"analyze_{f}"):
a = process_image(f,"Describe this image.")
st.markdown(a)
else:
st.write("No images found.")
with tabs[1]:
vids = glob.glob("*.mp4")
if vids:
for v in vids:
with st.expander(f"πŸŽ₯ {os.path.basename(v)}"):
st.markdown(get_media_html(v,"video"),unsafe_allow_html=True)
if st.button(f"Analyze {os.path.basename(v)}", key=f"analyze_{v}"):
a = process_video_with_gpt(v,"Describe video.")
st.markdown(a)
else:
st.write("No videos found.")
elif tab_main == "πŸ“ File Editor":
if getattr(st.session_state,'current_file',None):
st.subheader(f"Editing: {st.session_state.current_file}")
new_text = st.text_area("Content:", st.session_state.file_content, height=300)
if st.button("Save"):
with open(st.session_state.current_file,'w',encoding='utf-8') as f:
f.write(new_text)
st.success("Updated!")
st.session_state.should_rerun = True
else:
st.write("Select a file from the sidebar to edit.")
# After main content, load and show file groups in sidebar
groups, sorted_prefixes = load_files_for_sidebar()
display_file_manager_sidebar(groups, sorted_prefixes)
# If viewing a prefix group, show all files in main area
if st.session_state.viewing_prefix and st.session_state.viewing_prefix in groups:
st.write("---")
st.write(f"**Viewing Group:** {st.session_state.viewing_prefix}")
# Show all files in this prefix group in order (mp3 and md)
# Sort by mod time descending (already sorted)
for f in groups[st.session_state.viewing_prefix]:
fname = os.path.basename(f)
ext = os.path.splitext(fname)[1].lower().strip('.')
st.write(f"### {fname}")
if ext == "md":
content = open(f,'r',encoding='utf-8').read()
st.markdown(content)
elif ext == "mp3":
st.audio(f)
else:
# just show a download link
st.markdown(get_download_link(f), unsafe_allow_html=True)
if st.button("Close Group View"):
st.session_state.viewing_prefix = None
if st.session_state.should_rerun:
st.session_state.should_rerun = False
st.rerun()
if __name__=="__main__":
main()