import streamlit as st
from azure.cosmos import CosmosClient, exceptions
import os
import pandas as pd
import traceback
import shutil
from github import Github
from git import Repo
from datetime import datetime
import base64
import json
import uuid # π² For generating unique IDs
from urllib.parse import quote # π For encoding URLs
from gradio_client import Client # π For connecting to Gradio apps
import anthropic
import pytz
import re
from PIL import Image
import glob
from streamlit.components.v1 import html
# π Welcome to our epic Cosmos DB, GitHub, and Claude App! The universe is at your command π
st.set_page_config(
page_title="π€GitπCosmosπ« & Claudeπ§ ",
page_icon="π€ππ«π",
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': 'https://huggingface.co/awacke1',
'Report a bug': 'https://huggingface.co/spaces/awacke1',
'About': 'πGitπCosmosπ« - Azure Cosmos DB and GitHub Agent, Now with Claude!'
}
)
# π Cosmos DB configuration
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key") # π Don't forget your key!
# Set up the Anthropic client (Claude time π€π§ )
client_anthropic = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
# Set up Gradio client for ArXiv queries (ArXiv, the scholar's playground π)
client_gradio = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern")
# π GitHub configuration
def download_github_repo(url, local_path):
# π Let's download that GitHub repo!
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
def create_zip_file(source_dir, output_filename):
# π¦ Zipping up files like a pro!
shutil.make_archive(output_filename, 'zip', source_dir)
def create_repo(g, repo_name):
# π οΈ Creating a new GitHub repo. It's alive!
user = g.get_user()
return user.create_repo(repo_name)
def push_to_github(local_path, repo, github_token):
# π Pushing code to GitHub. Blast off!
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
#if 'origin' in [remote.name for remote.local_repo.remotes]: --error
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
def get_base64_download_link(file_path, file_name):
# π§ββοΈ Generating a magical download link!
with open(file_path, "rb") as file:
contents = file.read()
base64_encoded = base64.b64encode(contents).decode()
return f'β¬οΈ Download {file_name}'
# π Cosmos DB functions
def insert_record(container, record):
try:
container.create_item(body=record)
return True, "Record inserted successfully! π"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)} π¨"
except Exception as e:
return False, f"An unexpected error occurred: {str(e)} π±"
# π² Function to generate a unique UUID
def generate_unique_id():
return str(uuid.uuid4())
# Claude π§ Chat handling (Wise responses guaranteed! π€)
def chat_with_claude(user_input):
response = client_anthropic.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1000,
messages=[
{"role": "user", "content": user_input}
]
)
return response.content[0].text
# File handling (Save that wisdom in files! πΎ)
def generate_filename(prompt, file_type):
central = pytz.timezone('US/Central')
safe_date_time = datetime.now(central).strftime("%m%d_%H%M")
safe_prompt = re.sub(r'\W+', '_', prompt)[:90]
return f"{safe_date_time}_{safe_prompt}.{file_type}"
def create_file(filename, prompt, response, should_save=True):
if not should_save:
return
with open(filename, 'w', encoding='utf-8') as file:
file.write(prompt + "\n\n" + response)
def load_file(file_name):
with open(file_name, "r", encoding='utf-8') as file:
content = file.read()
return content
# π¨ Image and media handling (Let's look at some visuals! πΈπ¬πΆ)
def get_video_html(video_path, width="100%"):
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
'''
def get_audio_html(audio_path, width="100%"):
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
'''
# Streamlit layout (because a clean UI is key! π )
def main():
st.title("πGitπCosmosπ« & Claudeπ§ - The Ultimate App")
# Sidebar navigation (The command center π§)
st.sidebar.title("π§ Claudeπ & Cosmos Explorer")
# Cosmos DB Section
st.sidebar.header("Cosmos DB Controls")
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
if st.session_state.logged_in:
st.sidebar.write("π Connected to Cosmos DB!")
# Fetch documents
cosmos_client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
databases = [db['id'] for db in cosmos_client.list_databases()]
selected_db = st.sidebar.selectbox("Select Database", databases)
if selected_db:
db_client = cosmos_client.get_database_client(selected_db)
containers = [container['id'] for container in db_client.list_containers()]
selected_container = st.sidebar.selectbox("Select Container", containers)
if selected_container:
container = db_client.get_container_client(selected_container)
documents = list(container.read_all_items())
if documents:
st.write("π Document List:")
df = pd.DataFrame(documents)
st.dataframe(df)
else:
st.write("π No documents in this container.")
# Claude Chat Section
st.header("Chat with Claude π€π§ ")
user_input = st.text_area("Ask Claude anything:")
if st.button("Send"):
if user_input:
response = chat_with_claude(user_input)
st.write(f"Claude says: {response}")
filename = generate_filename(user_input, "md")
create_file(filename, user_input, response)
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
st.session_state.chat_history.append({"user": user_input, "claude": response})
if "chat_history" in st.session_state:
st.subheader("Past Conversations π")
for chat in st.session_state.chat_history:
st.text_area("You said π¬:", chat["user"], height=100, disabled=True)
st.text_area("Claude replied π€:", chat["claude"], height=200, disabled=True)
st.markdown("---")
# Media Galleries (For those who prefer images and videos πΌπ₯πΆ)
st.subheader("Image Gallery πΌ")
image_files = glob.glob("*.png") + glob.glob("*.jpg")
cols = st.columns(3)
for idx, image_file in enumerate(image_files):
with cols[idx % 3]:
img = Image.open(image_file)
st.image(img)
st.subheader("Video Gallery π₯")
video_files = glob.glob("*.mp4")
for video_file in video_files:
st.markdown(get_video_html(video_file), unsafe_allow_html=True)
st.subheader("Audio Gallery πΆ")
audio_files = glob.glob("*.mp3") + glob.glob("*.wav")
for audio_file in audio_files:
st.markdown(get_audio_html(audio_file), unsafe_allow_html=True)
if __name__ == "__main__":
main()