Spaces:
Sleeping
Sleeping
File size: 8,305 Bytes
5f9e152 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
import os
import gc
import re
import uuid
import subprocess
import requests
from dotenv import load_dotenv
os.environ["HF_HOME"] = "weights"
os.environ["TORCH_HOME"] = "weights"
import streamlit as st
from llama_index.core import Settings
from llama_index.llms.ollama import Ollama
from llama_index.core import PromptTemplate
from llama_index.core import SimpleDirectoryReader
from llama_index.core import VectorStoreIndex
from llama_index.core.storage.storage_context import StorageContext
from langchain_community.embeddings import HuggingFaceEmbeddings
from llama_index.embeddings.langchain import LangchainEmbedding
from rag_101.retriever import (
load_embedding_model,
load_reranker_model
)
# setup the llm
ollama_url = 'http://localhost:11434/api/chat'
llm = Ollama(model="mistral:instruct", url=ollama_url ,request_timeout=1000.0)
# TODO: setup the embedding model
lc_embedding_model = load_embedding_model()
embed_model = LangchainEmbedding(lc_embedding_model)
# utility functions
def parse_github_url(url):
pattern = r"https://github\.com/([^/]+)/([^/]+)"
match = re.match(pattern, url)
return match.groups() if match else (None, None)
def clone_repo(repo_url):
try:
result = subprocess.run(["git", "clone", repo_url], check=True, text=True, capture_output=True)
print(result.stdout)
return result
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e.stderr}")
raise e
def validate_owner_repo(owner, repo):
return bool(owner) and bool(repo)
if "id" not in st.session_state:
st.session_state.id = uuid.uuid4()
st.session_state.file_cache = {}
session_id = st.session_state.id
client = None
def reset_chat():
st.session_state.messages = []
st.session_state.context = None
gc.collect()
with st.sidebar:
# input for Github URL
github_url = st.text_input("Github Repository URL")
# button to load and process the github repository
process_button = st.button("Load")
message_container = st.empty() # placeholder for dynamic messages
if process_button and github_url:
owner, repo = parse_github_url(github_url)
if validate_owner_repo(owner, repo):
with st.spinner(f"Loading {repo} repository by {owner}..."):
try:
input_dir_path = f"./{repo}"
if not os.path.exists((input_dir_path)):
clone_repo(github_url)
if os.path.exists(input_dir_path):
loader = SimpleDirectoryReader(
input_dir=input_dir_path,
required_exts=[".py", ".ipynb", ".js", ".ts", ".md"],
recursive=True
)
else:
st.error('Error occurred while cloning the repo, carefully check the URL')
st.stop()
docs = loader.load_data()
# TODO: ====== Create vector store and upload data ======
Settings.embed_model = embed_model
index = VectorStoreIndex.from_documents(docs)
# setup a query engine
Settings.llm = llm
query_engine = index.as_query_engine(streaming=True, similarity_top_k=4)
# customize prompt template
qa_prompt_tmpl_str = (
"Context information is below.\n"
"---------------------\n"
"{context_str}\n"
"---------------------\n"
"Given the context information above I want you to think step by step to answer the query in a crisp manner, in case you don't know the answer say 'I don't know!'.\n"
"Query: {query_str}\n"
"Answer: "
)
qa_prompt_tmpl_str = PromptTemplate(qa_prompt_tmpl_str)
query_engine.update_prompts(
{"response_synthesizer:text_qa_template": qa_prompt_tmpl_str}
)
if docs:
message_container.success("Data loaded successfully!!")
else:
message_container.write(
"No Data found, check if repository is not empty!!"
)
st.session_state.query_engine = query_engine
except Exception as e:
st.error(f"An error occurred: {e}")
st.stop()
st.success("Ready to chat!")
else:
st.error('Invalid owner or repo')
st.stop()
col1, col2 = st.columns([6, 1])
with col1:
st.header(f"Chat with your code! </>")
with col2:
st.button("Clear ↺", on_click=reset_chat)
# Initialize chat history
if "messages" not in st.session_state:
reset_chat()
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
# TODO: old one
if prompt := st.chat_input("What's up?"):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Display assistant response in chat message container
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
# context = st.session_state.context
query_engine = st.session_state.query_engine
# Simulate stream of response with milliseconds delay
streaming_response = query_engine.query(prompt)
for chunk in streaming_response.response_gen:
full_response += chunk
message_placeholder.markdown(full_response + "▌")
# full_response = query_engine.query(prompt)
message_placeholder.markdown(full_response)
# st.session_state.context = ctx
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})
# todo: new one
# prompt = st.chat_input("What's up?")
# if prompt:
# # Add user message to chat history
# st.session_state.messages.append({"role": "user", "content": prompt})
#
# # Display user message in chat message container
# with st.chat_message("user"):
# st.markdown(prompt)
#
# # Display assistant response in chat message container
# with st.chat_message("assistant"):
# message_placeholder = st.empty()
# full_response = ""
#
# # context
# query_engine = st.session_state.query_engine
#
# # simulate stream of response with milliseconds delay
# try:
# # Construct the request payload
# payload = {
# "message": prompt,
# "model": "mistral:instruct"
# }
#
# # Send the request
# response = requests.post(ollama_url, json=payload)
#
# # Check for HTTP errors
# response.raise_for_status()
#
# # Print the full response to debug
# response_json = response.json()
# print(response_json)
#
# # Process the response
# if "response_gen" in response_json:
# for chunk in response_json["response_gen"]:
# full_response += chunk
# message_placeholder.markdown(full_response + "▌")
# message_placeholder.markdown(full_response)
#
# # add assistant response to chat history
# st.session_state.messages.append({"role": "assistant", "content": full_response})
# else:
# st.error("Unexpected response format: 'response_gen' key not found")
#
# except requests.exceptions.HTTPError as e:
# st.error(f"HTTP error: {e}")
|