Spaces:
Running
Running
"""Page to manage kbs""" | |
from __future__ import unicode_literals # this should always be the first import | |
import streamlit as st | |
import pandas as pd | |
from tempfile import NamedTemporaryFile | |
import os | |
import yt_dlp as youtube_dl | |
from openai import OpenAI | |
import wave | |
from dotenv import load_dotenv | |
import requests | |
import uuid | |
import time | |
from retrieve_kb import get_current_knowledge_bases, get_knowledge_base_information | |
from generate_kb import add_links_to_knowledge_base | |
from app import client, default_embedding_function, show_sidebar | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
load_dotenv() | |
openai_key = os.getenv("OPENAI_API_KEY") | |
show_sidebar() | |
def transcribe_audio(audio_path, openai_key, chunk_length=10000): | |
""" | |
Transcribe audio by breaking it into chunks using wave and numpy. | |
:param audio_path: Path to the audio file (e.g., "video.wav"). | |
:param chunk_length: Length of each audio chunk in milliseconds. | |
:return: Full transcription of the audio file. | |
""" | |
# Open the wave file | |
client = OpenAI(api_key=openai_key) | |
with wave.open(audio_path, "rb") as audio: | |
frame_rate = audio.getframerate() | |
n_channels = audio.getnchannels() | |
sample_width = audio.getsampwidth() | |
# Calculate the number of frames that make up the chunk_length in time | |
num_frames_per_chunk = int(frame_rate * (chunk_length / 1000.0)) | |
# Initialize an empty string to hold the full transcription | |
full_transcription = "" | |
# Read and process each chunk | |
while True: | |
# Read frames for the chunk | |
frames = audio.readframes(num_frames_per_chunk) | |
if not frames: | |
break | |
# Export chunk to a temporary WAV file | |
with wave.open("temp_chunk.wav", "wb") as chunk_audio: | |
chunk_audio.setnchannels(n_channels) | |
chunk_audio.setsampwidth(sample_width) | |
chunk_audio.setframerate(frame_rate) | |
chunk_audio.writeframes(frames) | |
# Open the temporary file and send it to the API | |
with open("temp_chunk.wav", "rb") as audio_file: | |
response = client.audio.transcriptions.create( | |
model="whisper-1", file=audio_file | |
) | |
# Append the transcription to the full transcription | |
full_transcription += response.text + " " | |
full_transcription = full_transcription.strip() | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "The following is the transcription of a youube video made by whisper. \ | |
I want you to adjust the transcription if theres is some error so that I can then insert this transcription to a vector database for retrieval for question answering. Please adkust the text if needed: ", | |
}, | |
{"role": "user", "content": f"{full_transcription}"}, | |
], | |
) | |
text = response.choices[0].message.content | |
return text | |
def download_and_transcribe_youtube(youtube_url): | |
ydl_opts = { | |
"format": "bestaudio/best", | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "wav", | |
"preferredquality": "192", | |
} | |
], | |
"outtmpl": "." + "/video.%(ext)s", | |
} | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([youtube_url]) | |
info_dict = ydl.extract_info(youtube_url, download=True) | |
video_title = info_dict.get("title", None) | |
# audio_file = open("video.wav", "rb") | |
text = transcribe_audio(audio_path="video.wav", openai_key=openai_key) | |
f_out_path = f"{video_title}.txt" | |
with open(f"{video_title}.txt", "w") as f_out: | |
f_out.write(text) | |
urls = [f_out_path] | |
add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=urls, | |
youtube_optional_link=youtube_url, | |
video_title=video_title, | |
) | |
os.remove(f"{video_title}.txt") | |
os.remove("video.wav") | |
os.remove("temp_chunk.wav") | |
if "url_list" not in st.session_state: | |
st.session_state["url_list"] = [] | |
def list_manager(): | |
def add_element(): | |
if len(user_input) > 0: | |
st.session_state["url_list"] += [user_input] | |
else: | |
st.warning("Enter text") | |
st.text("C'è un bug!!! Cliccare su add due volte!") | |
with st.expander("Add urls"): | |
user_input = st.text_input("Enter a url") | |
add_button = st.button("Add", key="add_button") | |
col1, col2 = st.columns((2)) | |
with col1: | |
if add_button: | |
add_element() | |
with col2: | |
if st.button("reset"): | |
st.session_state["url_list"] = [] | |
st.write(st.session_state["url_list"]) | |
def scrape_jina_ai(url: str) -> str: | |
response = requests.get("https://r.jina.ai/" + url) | |
return response.text | |
st.title("Manage collections") | |
kbs = get_current_knowledge_bases(client=client) | |
kbs = sorted(kb.name for kb in kbs) | |
collection_name = st.selectbox("Select knowledge box", kbs) | |
info = {} | |
collection = None | |
if "df" not in st.session_state: | |
st.session_state["df"] = pd.DataFrame() | |
col1, col2 = st.columns(2) | |
if st.button("Get All"): | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
st.session_state["collection"] = coll | |
st.session_state["client"] = client | |
collection = coll | |
df = pd.DataFrame.from_records(collection_info) | |
df["source"] = df["metadatas"].apply(lambda x: x.get("source", "unkown")) | |
df["title"] = df["metadatas"].apply(lambda x: x.get("title", "unkown")) | |
df = df[["documents", "source", "title", "ids"]] | |
st.session_state["df"] = df | |
if len(st.session_state["df"]) != 0: | |
st.dataframe(st.session_state["df"], width=3_000) | |
unique_df = st.session_state["df"]["source"].unique() | |
st.text(f"unique urls: {len(unique_df)}") | |
st.dataframe(unique_df) | |
else: | |
st.warning(f"{collection_name} KB is empty") | |
tab1, tab2, tab3, tab4, tab5, tab6, tab7 = st.tabs( | |
[ | |
"Remove", | |
"Add URL", | |
"Multiple URL", | |
"Add PDF", | |
"Add Youtube", | |
"Notion and Jina", | |
"Rename", | |
] | |
) | |
# remove stuff tab | |
with tab1: | |
# remove a split | |
st.header("Remove a split") | |
id = st.text_input("Insert a split id") | |
if st.button("Remove Id from collection"): | |
try: | |
if id in st.session_state["df"]["ids"].values.tolist(): | |
res = st.session_state["collection"].delete(ids=[f"{id}"]) | |
st.success(f"id {id} deleted") | |
else: | |
st.error(f"id {id} not in kb") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
# REMOVE URL | |
st.header("Remove url from collection") | |
url = st.text_input("remove url") | |
if st.button("Remove url from collection"): | |
try: | |
ids = st.session_state["collection"].get(where={"source": url})["ids"] | |
st.session_state["collection"].delete(ids=ids) | |
st.success("deleted") | |
except Exception as e: | |
st.error(str(e)) | |
# ADD URL | |
with tab2: | |
st.header("Add url to existing collection") | |
url_text = st.text_input( | |
"Insert a url link", | |
help="This should be text stored in a webpage like wikipedia. NB notion pages are not supported yet!", | |
) | |
if st.button("add url to collection"): | |
urls = [url_text] # put in a list even if only one | |
res = add_links_to_knowledge_base( | |
client=client, kb_name=collection_name, urls=urls | |
) | |
st.write(res) | |
# ADD CSV | |
with tab3: | |
list_manager() | |
if st.button("add urls to collection"): | |
res = add_links_to_knowledge_base( | |
client=client, kb_name=collection_name, urls=st.session_state["url_list"] | |
) | |
st.write(res) | |
st.text('The CSV should have the column named "urls"') | |
uploaded_file = st.file_uploader("Choose a file") | |
if uploaded_file is not None: | |
dataframe = pd.read_csv(uploaded_file) | |
unique_urls = dataframe["urls"].unique() | |
unique_urls_list = unique_urls.tolist() | |
if st.button("add CSV with urls to collection"): | |
add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=unique_urls_list, | |
) | |
# Add PDF | |
with tab4: | |
st.header("Add pdf to existing collection") | |
st.write( | |
"Trick: if you want to add a Notion page, \ | |
download the page as pdf, and load the pdf here together with the notion url" | |
) | |
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") | |
pdf_optional_link = st.text_input( | |
"Insert a URL link you want to associate with the pdf" | |
) | |
pdf_title = st.text_input("This title will be displayed as a resource in ask brian") | |
if st.button("add pdf"): | |
# Create a temporary file | |
with NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
# Write the uploaded PDF to the temporary file | |
tmp_file.write(uploaded_file.getvalue()) | |
tmp_path = tmp_file.name | |
print("PATH: ", tmp_path) | |
urls = [tmp_path] | |
res = add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=urls, | |
pdf_optional_link=pdf_optional_link, | |
pdf_title=pdf_title, | |
) | |
st.write(res) | |
# Clean up: delete the temporary file | |
os.remove(tmp_path) | |
# Add YOUTUBE | |
with tab5: | |
st.header("Add youtube video to collection") | |
st.image( | |
"", | |
width=200, # Manually Adjust the width of the image as per requirement | |
) | |
video_url = st.text_input("Youtube video url") | |
st.text( | |
"Aggiungere il video puo impiegare un bel pò. Avvia e vatti a fare una sigaretta" | |
) | |
if st.button("Add video"): | |
# Create a temporary file | |
# Write the uploaded PDF to the temporary file | |
try: | |
st.video(video_url) | |
download_and_transcribe_youtube(video_url) | |
st.success("Video Added") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
### NOTION WITH JINA AI ### | |
with tab6: | |
st.header("Add Notion with JinaAI") | |
url = st.text_input("Website url") | |
text = scrape_jina_ai(url=url) | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
text_splitter = RecursiveCharacterTextSplitter( | |
# Set a really small chunk size, just to show. | |
chunk_size=1_000, | |
chunk_overlap=200, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
doc_list = text_splitter.create_documents([text]) | |
text_list = [doc.page_content for doc in doc_list] | |
ids = [str(uuid.uuid4()) for _ in text_list] | |
if st.button("Click button", key="jina button"): | |
try: | |
coll.add( | |
documents=text_list, | |
metadatas=[{"source": f"{url}"} for _ in text_list], | |
ids=ids, | |
) | |
st.success("Added") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
### RENAME COLLECTIONS ### | |
with tab7: | |
# remove a split | |
st.header("Rename collection") | |
new_name = st.text_input("New collection name") | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
if st.button("rename"): | |
try: | |
coll.modify( | |
name=new_name, | |
) | |
except Exception as e: | |
st.error(f"{str(e)}") | |
st.success("Done") | |
time.sleep(1) | |
st.experimental_rerun() | |