import streamlit as st from azure.cosmos import CosmosClient, exceptions import os import pandas as pd import traceback import shutil from github import Github from git import Repo from datetime import datetime import base64 import json import uuid # 🎲 For generating unique IDs from urllib.parse import quote # πŸ”— For encoding URLs from gradio_client import Client # 🌐 For connecting to Gradio apps import anthropic import pytz import re from PIL import Image import glob from streamlit.components.v1 import html # πŸŽ‰ Welcome to our epic Cosmos DB, GitHub, and Claude App! The universe is at your command πŸš€ st.set_page_config( page_title="πŸ€–Git🌌CosmosπŸ’« & Claude🧠", page_icon="πŸ€–πŸŒŒπŸ’«πŸ“", layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a bug': 'https://huggingface.co/spaces/awacke1', 'About': 'πŸ™Git🌌CosmosπŸ’« - Azure Cosmos DB and GitHub Agent, Now with Claude!' } ) # 🌌 Cosmos DB configuration ENDPOINT = "https://acae-afd.documents.azure.com:443/" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") # πŸ”‘ Don't forget your key! # Set up the Anthropic client (Claude time πŸ€–πŸ§ ) client_anthropic = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) # Set up Gradio client for ArXiv queries (ArXiv, the scholar's playground πŸŽ“) client_gradio = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") # πŸ™ GitHub configuration def download_github_repo(url, local_path): # 🚚 Let's download that GitHub repo! if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): # πŸ“¦ Zipping up files like a pro! shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): # πŸ› οΈ Creating a new GitHub repo. It's alive! user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): # πŸš€ Pushing code to GitHub. Blast off! repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: #if 'origin' in [remote.name for remote.local_repo.remotes]: --error origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') def get_base64_download_link(file_path, file_name): # πŸ§™β€β™‚οΈ Generating a magical download link! with open(file_path, "rb") as file: contents = file.read() base64_encoded = base64.b64encode(contents).decode() return f'⬇️ Download {file_name}' # 🌟 Cosmos DB functions def insert_record(container, record): try: container.create_item(body=record) return True, "Record inserted successfully! πŸŽ‰" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {str(e)} 😱" # 🎲 Function to generate a unique UUID def generate_unique_id(): return str(uuid.uuid4()) # Claude 🧠 Chat handling (Wise responses guaranteed! πŸ€–) def chat_with_claude(user_input): response = client_anthropic.messages.create( model="claude-3-sonnet-20240229", max_tokens=1000, messages=[ {"role": "user", "content": user_input} ] ) return response.content[0].text # File handling (Save that wisdom in files! πŸ’Ύ) def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'\W+', '_', prompt)[:90] return f"{safe_date_time}_{safe_prompt}.{file_type}" def create_file(filename, prompt, response, should_save=True): if not should_save: return with open(filename, 'w', encoding='utf-8') as file: file.write(prompt + "\n\n" + response) def load_file(file_name): with open(file_name, "r", encoding='utf-8') as file: content = file.read() return content # 🎨 Image and media handling (Let's look at some visuals! πŸ“ΈπŸŽ¬πŸŽΆ) def get_video_html(video_path, width="100%"): video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" return f''' ''' def get_audio_html(audio_path, width="100%"): audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" return f''' ''' # Streamlit layout (because a clean UI is key! 🏠) def main(): st.title("πŸ™Git🌌CosmosπŸ’« & Claude🧠 - The Ultimate App") # Sidebar navigation (The command center 🧭) st.sidebar.title("🧠ClaudeπŸ“ & Cosmos Explorer") # Cosmos DB Section st.sidebar.header("Cosmos DB Controls") if Key: st.session_state.primary_key = Key st.session_state.logged_in = True if st.session_state.logged_in: st.sidebar.write("πŸ” Connected to Cosmos DB!") # Fetch documents cosmos_client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) databases = [db['id'] for db in cosmos_client.list_databases()] selected_db = st.sidebar.selectbox("Select Database", databases) if selected_db: db_client = cosmos_client.get_database_client(selected_db) containers = [container['id'] for container in db_client.list_containers()] selected_container = st.sidebar.selectbox("Select Container", containers) if selected_container: container = db_client.get_container_client(selected_container) documents = list(container.read_all_items()) if documents: st.write("πŸ“ Document List:") df = pd.DataFrame(documents) st.dataframe(df) else: st.write("πŸ“­ No documents in this container.") # Claude Chat Section st.header("Chat with Claude πŸ€–πŸ§ ") user_input = st.text_area("Ask Claude anything:") if st.button("Send"): if user_input: response = chat_with_claude(user_input) st.write(f"Claude says: {response}") filename = generate_filename(user_input, "md") create_file(filename, user_input, response) if 'chat_history' not in st.session_state: st.session_state.chat_history = [] st.session_state.chat_history.append({"user": user_input, "claude": response}) if "chat_history" in st.session_state: st.subheader("Past Conversations πŸ“œ") for chat in st.session_state.chat_history: st.text_area("You said πŸ’¬:", chat["user"], height=100, disabled=True) st.text_area("Claude replied πŸ€–:", chat["claude"], height=200, disabled=True) st.markdown("---") # Media Galleries (For those who prefer images and videos πŸ–ΌπŸŽ₯🎢) st.subheader("Image Gallery πŸ–Ό") image_files = glob.glob("*.png") + glob.glob("*.jpg") cols = st.columns(3) for idx, image_file in enumerate(image_files): with cols[idx % 3]: img = Image.open(image_file) st.image(img) st.subheader("Video Gallery πŸŽ₯") video_files = glob.glob("*.mp4") for video_file in video_files: st.markdown(get_video_html(video_file), unsafe_allow_html=True) st.subheader("Audio Gallery 🎢") audio_files = glob.glob("*.mp3") + glob.glob("*.wav") for audio_file in audio_files: st.markdown(get_audio_html(audio_file), unsafe_allow_html=True) if __name__ == "__main__": main()