import streamlit as st import anthropic, openai, base64, cv2, glob, json, math, os, pytz, random, re, requests, textract, time, zipfile import streamlit.components.v1 as components from datetime import datetime from bs4 import BeautifulSoup from collections import defaultdict from dotenv import load_dotenv from gradio_client import Client from huggingface_hub import InferenceClient from io import BytesIO from PIL import Image from PyPDF2 import PdfReader from urllib.parse import quote import asyncio import edge_tts import io import sys import subprocess # 🧹 Clean up the environment and load keys st.set_page_config( page_title="🚲BikeAIπŸ† Claude/GPT Research", page_icon="πŸš²πŸ†", layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a bug': 'https://huggingface.co/spaces/awacke1', 'About': "🚲BikeAIπŸ† Claude/GPT Research AI" } ) load_dotenv() openai_api_key = os.getenv('OPENAI_API_KEY', "") anthropic_key = os.getenv('ANTHROPIC_API_KEY', "") if 'OPENAI_API_KEY' in st.secrets: openai_api_key = st.secrets['OPENAI_API_KEY'] if 'ANTHROPIC_API_KEY' in st.secrets: anthropic_key = st.secrets["ANTHROPIC_API_KEY"] openai.api_key = openai_api_key HF_KEY = os.getenv('HF_KEY') API_URL = os.getenv('API_URL') claude_client = anthropic.Anthropic(api_key=anthropic_key) # For GPT-4o calls openai_client = openai.ChatCompletion # 🧠 Session State for var in ['transcript_history','chat_history','openai_model','messages','last_voice_input', 'editing_file','edit_new_name','edit_new_content','viewing_prefix','should_rerun', 'old_val']: if var not in st.session_state: st.session_state[var] = [] if var.endswith('history') else None if var.startswith('view') else "" if not st.session_state.openai_model: st.session_state.openai_model = "gpt-4-0613" # Update to a stable GPT-4 model if needed # 🎨 Custom CSS st.markdown(""" """, unsafe_allow_html=True) # 🏷️ Helper for extracting high-info terms def get_high_info_terms(text: str) -> list: stop_words = set([ 'the','a','an','and','or','but','in','on','at','to','for','of','with','by','from','up','about','into','over','after','is','are','was','were','be','been','being','have','has','had','do','does','did','will','would','should','could','might','must','shall','can','may','this','that','these','those','i','you','he','she','it','we','they','what','which','who','when','where','why','how','all','any','both','each','few','more','most','other','some','such','than','too','very','just','there' ]) text_lower = text.lower() words = re.findall(r'\b\w+(?:-\w+)*\b', text_lower) meaningful = [w for w in words if len(w)>3 and w not in stop_words and any(c.isalpha() for c in w)] # Deduplicate while preserving order seen = set() uniq = [w for w in meaningful if not (w in seen or seen.add(w))] return uniq[:5] # 🍱 Improved filename generation includes prompt & response terms def generate_filename(prompt, response, file_type="md"): # Combine prompt & response for naming. The prompt terms have priority. prompt_terms = get_high_info_terms(prompt) response_terms = get_high_info_terms(response) combined_terms = prompt_terms + [t for t in response_terms if t not in prompt_terms] name_text = '_'.join(t.replace(' ','-') for t in combined_terms) or 'file' # Limit length name_text = name_text[:100] prefix = datetime.now().strftime("%y%m_%H%M_") return f"{prefix}{name_text}.{file_type}" # πŸ—£οΈ Clean text for speech def clean_for_speech(text: str) -> str: text = text.replace("\n", " ").replace("", " ").replace("#","") text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) return re.sub(r"\s+", " ", text).strip() async def edge_tts_generate_audio(text, voice="en-US-AriaNeural", rate=0, pitch=0): text = clean_for_speech(text) if not text.strip(): return None rate_str = f"{rate:+d}%" pitch_str = f"{pitch:+d}Hz" com = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str) out_fn = generate_filename(text, text, "mp3") await com.save(out_fn) return out_fn def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0): return asyncio.run(edge_tts_generate_audio(text, voice, rate, pitch)) def play_and_download_audio(file_path): if file_path and os.path.exists(file_path): st.audio(file_path) enc = base64.b64encode(open(file_path,"rb").read()).decode() st.markdown(f'Download {os.path.basename(file_path)}', unsafe_allow_html=True) # 🧰 Code execution environment context = {} # πŸ› οΈ Executes Python code blocks safely, returning combined output def execute_python_blocks(response): combined = "" code_blocks = re.findall(r'```(?:python\s*)?([\s\S]*?)```', response, re.IGNORECASE) for code_block in code_blocks: old_stdout = sys.stdout sys.stdout = io.StringIO() try: exec(code_block, context) code_output = sys.stdout.getvalue() combined += f"# Code Execution Output\n```\n{code_output}\n```\n\n" st.code(code_output) except Exception as e: combined += f"# Execution Error\n```python\n{e}\n```\n\n" finally: sys.stdout = old_stdout return combined # πŸ—ƒοΈ Creates & saves a file with prompt/response and executed code results def create_file(filename, prompt, response): base, ext = os.path.splitext(filename) content = f"# Prompt πŸ“\n{prompt}\n\n# Response πŸ’¬\n{response}\n\n" # Execute code in response exec_results = execute_python_blocks(response) content += exec_results # Save with open(f"{base}.md", 'w', encoding='utf-8') as file: file.write(content) # Download link with open(f"{base}.md", 'rb') as file: encoded = base64.b64encode(file.read()).decode() href = f'Download File πŸ“„' st.markdown(href, unsafe_allow_html=True) # 🎨 Unified AI call helper def call_model(model_type, text): # model_type: "Arxiv", "GPT-4", or "Claude" # Returns (answer, filename) if model_type == "Claude": try: r = claude_client.completions.create( prompt=anthropic.HUMAN_PROMPT + text + anthropic.AI_PROMPT, model="claude-instant-1", max_tokens_to_sample=1000 ) ans = r.completion.strip() except Exception as e: ans = f"Error calling Claude: {e}" elif model_type == "Arxiv": # ArXiv RAG flow client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") refs = client.predict(text,20,"Semantic Search","mistralai/Mixtral-8x7B-Instruct-v0.1",api_name="/update_with_rag_md")[0] r2 = client.predict(text,"mistralai/Mixtral-8x7B-Instruct-v0.1",True,api_name="/ask_llm") ans = f"### πŸ”Ž {text}\n\n{r2}\n\n{refs}" else: # GPT-4o call try: c = openai_client.create( model=st.session_state.openai_model, messages=[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":text}] ) ans = c.choices[0].message.content.strip() except Exception as e: ans = f"Error calling GPT-4: {e}" filename = generate_filename(text, ans, "md") create_file(filename, text, ans) return ans # 🎢 Audio response options def handle_audio_generation(q, ans, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False): # This is used only for ArXiv results if not ans.startswith("### πŸ”Ž"): return # Extract sections for audio generation # The main short answer is r2: We'll approximate by splitting at double-newline. parts = ans.split("\n\n") if len(parts)>2: short_answer = parts[1] refs = parts[-1] else: short_answer = ans refs = "" if full_audio: complete_text = f"Complete response for query: {q}. {clean_for_speech(short_answer)} {clean_for_speech(refs)}" f_all = speak_with_edge_tts(complete_text) st.write("### πŸ“š Complete Audio Response") play_and_download_audio(f_all) if vocal_summary: main_audio = speak_with_edge_tts(short_answer) st.write("### πŸŽ™οΈ Vocal Summary") play_and_download_audio(main_audio) if extended_refs and refs.strip(): ref_audio = speak_with_edge_tts("Here are the extended references: " + refs) st.write("### πŸ“œ Extended References & Summaries") play_and_download_audio(ref_audio) if titles_summary and refs.strip(): titles = [m.group(1) for line in refs.split('\n') for m in [re.search(r"\[([^\]]+)\]", line)] if m] if titles: titles_text = "Here are the titles of the papers: " + ", ".join(titles) t_audio = speak_with_edge_tts(titles_text) st.write("### πŸ”– Paper Titles") play_and_download_audio(t_audio) # 🏦 File Management def create_zip_of_files(): md_files = glob.glob("*.md") mp3_files = glob.glob("*.mp3") if not (md_files or mp3_files): return None all_files = md_files+mp3_files # Derive name from their content text_combined = "" for f in all_files: if f.endswith(".md"): text_combined += open(f,'r',encoding='utf-8').read() terms = get_high_info_terms(text_combined) name_text = '_'.join(terms[:3]) or 'archive' zip_name = f"{datetime.now().strftime('%y%m_%H%M')}_{name_text}.zip" with zipfile.ZipFile(zip_name,'w') as z: for f in all_files: z.write(f) return zip_name def load_files_for_sidebar(): md = glob.glob("*.md") mp3 = glob.glob("*.mp3") allf = md+mp3 groups = defaultdict(list) for f in allf: prefix = os.path.basename(f)[:10] groups[prefix].append(f) sorted_pref = sorted(groups.keys(), key=lambda pre: max(os.path.getmtime(x) for x in groups[pre]), reverse=True) return groups, sorted_pref def display_file_manager_sidebar(): st.sidebar.title("🎡 Audio & Document Manager") groups, sorted_pref = load_files_for_sidebar() all_md = [f for f in glob.glob("*.md") if os.path.basename(f).lower()!='readme.md'] all_mp3 = glob.glob("*.mp3") top_bar = st.sidebar.columns(3) with top_bar[0]: if st.button("πŸ—‘ Del All MD"): for f in all_md: os.remove(f) st.session_state.should_rerun = True with top_bar[1]: if st.button("πŸ—‘ Del All MP3"): for f in all_mp3: os.remove(f) st.session_state.should_rerun = True with top_bar[2]: if st.button("⬇️ Zip All"): z = create_zip_of_files() if z: with open(z,"rb") as f: b64 = base64.b64encode(f.read()).decode() st.sidebar.markdown(f'πŸ“‚ Download {os.path.basename(z)}', unsafe_allow_html=True) for prefix in sorted_pref: files = groups[prefix] # Extract simple keywords txt = "" for f in files: if f.endswith(".md"): txt+=open(f,'r',encoding='utf-8').read()+" " kw = get_high_info_terms(txt) kw_str = " ".join(kw) if kw else "No Keywords" with st.sidebar.expander(f"{prefix} Files ({len(files)}) - {kw_str}", expanded=True): c1,c2 = st.columns(2) with c1: if st.button("πŸ‘€View Group", key="view_"+prefix): st.session_state.viewing_prefix = prefix with c2: if st.button("πŸ—‘Del Group", key="del_"+prefix): for f in files: os.remove(f) st.success(f"Deleted group {prefix}") st.session_state.should_rerun = True for f in files: ctime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S") st.write(f"**{os.path.basename(f)}** - {ctime}") # Viewing group if st.session_state.viewing_prefix and st.session_state.viewing_prefix in groups: st.write("---") st.write(f"**Viewing Group:** {st.session_state.viewing_prefix}") for f in groups[st.session_state.viewing_prefix]: ext = f.split('.')[-1].lower() st.write(f"### {os.path.basename(f)}") if ext == "md": c = open(f,'r',encoding='utf-8').read() st.markdown(c) elif ext == "mp3": st.audio(f) else: with open(f,"rb") as fil: enc = base64.b64encode(fil.read()).decode() st.markdown(f'Download {os.path.basename(f)}', unsafe_allow_html=True) if st.button("Close Group View"): st.session_state.viewing_prefix = None def main(): st.sidebar.markdown("### 🚲BikeAIπŸ† Multi-Agent Research AI") tab_main = st.radio("Action:",["🎀 Voice Input","πŸ” Search ArXiv","πŸ“ File Editor"],horizontal=True) # A small custom component hook (if used) mycomponent = components.declare_component("mycomponent", path="mycomponent") val = mycomponent(my_input_value="Hello") # If we have component input, show controls if val: edited_input = st.text_area("Edit your detected input:", value=val.strip().replace('\n',' '), height=100) run_option = st.selectbox("Select AI Model:", ["Arxiv", "GPT-4", "Claude"]) col1, col2 = st.columns(2) with col1: autorun = st.checkbox("AutoRun on input change", value=False) with col2: full_audio = st.checkbox("Generate Complete Audio", value=False) input_changed = (val != st.session_state.old_val) if (autorun and input_changed) or st.button("Process Input"): st.session_state.old_val = val ans = call_model("Arxiv" if run_option=="Arxiv" else ("Claude" if run_option=="Claude" else "GPT-4"), edited_input) if run_option=="Arxiv": # Audio generation for Arxiv handle_audio_generation(edited_input, ans, True, False, True, full_audio) if tab_main == "πŸ” Search ArXiv": q = st.text_input("Research query:") st.markdown("### πŸŽ›οΈ Audio Generation Options") vocal_summary = st.checkbox("Short Answer Audio", True) extended_refs = st.checkbox("Extended References Audio", False) titles_summary = st.checkbox("Paper Titles Audio", True) full_audio = st.checkbox("Full Audio Response", False) if q and st.button("Run ArXiv Query"): ans = call_model("Arxiv", q) handle_audio_generation(q, ans, vocal_summary, extended_refs, titles_summary, full_audio) elif tab_main == "🎀 Voice Input": user_text = st.text_area("Message:", height=100) user_text = user_text.strip() if st.button("Send πŸ“¨"): ans = call_model("GPT-4", user_text) st.session_state.messages.append({"role":"user","content":user_text}) st.session_state.messages.append({"role":"assistant","content":ans}) st.subheader("πŸ“œ Chat History") for m in st.session_state.messages: with st.chat_message(m["role"]): st.markdown(m["content"]) elif tab_main == "πŸ“ File Editor": # For simplicity, user selects from sidebar st.write("Select a file from the sidebar to edit.") # Display File Manager display_file_manager_sidebar() if st.session_state.should_rerun: st.session_state.should_rerun = False st.rerun() if __name__=="__main__": main()