collection-manager / pages /manage_knowledge_box.py
marcellopoliti's picture
feat: add csv file
9221d01
"""Page to manage kbs"""
from __future__ import unicode_literals # this should always be the first import
import streamlit as st
import pandas as pd
from tempfile import NamedTemporaryFile
import os
import yt_dlp as youtube_dl
from openai import OpenAI
import wave
from dotenv import load_dotenv
import requests
import uuid
import time
from retrieve_kb import get_current_knowledge_bases, get_knowledge_base_information
from generate_kb import add_links_to_knowledge_base
from app import client, default_embedding_function, show_sidebar
from langchain_text_splitters import RecursiveCharacterTextSplitter
load_dotenv()
openai_key = os.getenv("OPENAI_API_KEY")
show_sidebar()
def transcribe_audio(audio_path, openai_key, chunk_length=10000):
"""
Transcribe audio by breaking it into chunks using wave and numpy.
:param audio_path: Path to the audio file (e.g., "video.wav").
:param chunk_length: Length of each audio chunk in milliseconds.
:return: Full transcription of the audio file.
"""
# Open the wave file
client = OpenAI(api_key=openai_key)
with wave.open(audio_path, "rb") as audio:
frame_rate = audio.getframerate()
n_channels = audio.getnchannels()
sample_width = audio.getsampwidth()
# Calculate the number of frames that make up the chunk_length in time
num_frames_per_chunk = int(frame_rate * (chunk_length / 1000.0))
# Initialize an empty string to hold the full transcription
full_transcription = ""
# Read and process each chunk
while True:
# Read frames for the chunk
frames = audio.readframes(num_frames_per_chunk)
if not frames:
break
# Export chunk to a temporary WAV file
with wave.open("temp_chunk.wav", "wb") as chunk_audio:
chunk_audio.setnchannels(n_channels)
chunk_audio.setsampwidth(sample_width)
chunk_audio.setframerate(frame_rate)
chunk_audio.writeframes(frames)
# Open the temporary file and send it to the API
with open("temp_chunk.wav", "rb") as audio_file:
response = client.audio.transcriptions.create(
model="whisper-1", file=audio_file
)
# Append the transcription to the full transcription
full_transcription += response.text + " "
full_transcription = full_transcription.strip()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "The following is the transcription of a youube video made by whisper. \
I want you to adjust the transcription if theres is some error so that I can then insert this transcription to a vector database for retrieval for question answering. Please adkust the text if needed: ",
},
{"role": "user", "content": f"{full_transcription}"},
],
)
text = response.choices[0].message.content
return text
def download_and_transcribe_youtube(youtube_url):
ydl_opts = {
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "wav",
"preferredquality": "192",
}
],
"outtmpl": "." + "/video.%(ext)s",
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
info_dict = ydl.extract_info(youtube_url, download=True)
video_title = info_dict.get("title", None)
# audio_file = open("video.wav", "rb")
text = transcribe_audio(audio_path="video.wav", openai_key=openai_key)
f_out_path = f"{video_title}.txt"
with open(f"{video_title}.txt", "w") as f_out:
f_out.write(text)
urls = [f_out_path]
add_links_to_knowledge_base(
client=client,
kb_name=collection_name,
urls=urls,
youtube_optional_link=youtube_url,
video_title=video_title,
)
os.remove(f"{video_title}.txt")
os.remove("video.wav")
os.remove("temp_chunk.wav")
if "url_list" not in st.session_state:
st.session_state["url_list"] = []
def list_manager():
def add_element():
if len(user_input) > 0:
st.session_state["url_list"] += [user_input]
else:
st.warning("Enter text")
st.text("C'è un bug!!! Cliccare su add due volte!")
with st.expander("Add urls"):
user_input = st.text_input("Enter a url")
add_button = st.button("Add", key="add_button")
col1, col2 = st.columns((2))
with col1:
if add_button:
add_element()
with col2:
if st.button("reset"):
st.session_state["url_list"] = []
st.write(st.session_state["url_list"])
def scrape_jina_ai(url: str) -> str:
response = requests.get("https://r.jina.ai/" + url)
return response.text
st.title("Manage collections")
kbs = get_current_knowledge_bases(client=client)
kbs = sorted(kb.name for kb in kbs)
collection_name = st.selectbox("Select knowledge box", kbs)
info = {}
collection = None
if "df" not in st.session_state:
st.session_state["df"] = pd.DataFrame()
col1, col2 = st.columns(2)
if st.button("Get All"):
collection_info, coll, client = get_knowledge_base_information(
client=client,
embedding_function=default_embedding_function,
kb_name=collection_name,
)
st.session_state["collection"] = coll
st.session_state["client"] = client
collection = coll
df = pd.DataFrame.from_records(collection_info)
df["source"] = df["metadatas"].apply(lambda x: x.get("source", "unkown"))
df["title"] = df["metadatas"].apply(lambda x: x.get("title", "unkown"))
df = df[["documents", "source", "title", "ids"]]
st.session_state["df"] = df
if len(st.session_state["df"]) != 0:
st.dataframe(st.session_state["df"], width=3_000)
unique_df = st.session_state["df"]["source"].unique()
st.text(f"unique urls: {len(unique_df)}")
st.dataframe(unique_df)
else:
st.warning(f"{collection_name} KB is empty")
tab1, tab2, tab3, tab4, tab5, tab6, tab7 = st.tabs(
[
"Remove",
"Add URL",
"Multiple URL",
"Add PDF",
"Add Youtube",
"Notion and Jina",
"Rename",
]
)
# remove stuff tab
with tab1:
# remove a split
st.header("Remove a split")
id = st.text_input("Insert a split id")
if st.button("Remove Id from collection"):
try:
if id in st.session_state["df"]["ids"].values.tolist():
res = st.session_state["collection"].delete(ids=[f"{id}"])
st.success(f"id {id} deleted")
else:
st.error(f"id {id} not in kb")
except Exception as e:
st.error(f"{str(e)}")
# REMOVE URL
st.header("Remove url from collection")
url = st.text_input("remove url")
if st.button("Remove url from collection"):
try:
ids = st.session_state["collection"].get(where={"source": url})["ids"]
st.session_state["collection"].delete(ids=ids)
st.success("deleted")
except Exception as e:
st.error(str(e))
# ADD URL
with tab2:
st.header("Add url to existing collection")
url_text = st.text_input(
"Insert a url link",
help="This should be text stored in a webpage like wikipedia. NB notion pages are not supported yet!",
)
if st.button("add url to collection"):
urls = [url_text] # put in a list even if only one
res = add_links_to_knowledge_base(
client=client, kb_name=collection_name, urls=urls
)
st.write(res)
# ADD CSV
with tab3:
list_manager()
if st.button("add urls to collection"):
res = add_links_to_knowledge_base(
client=client, kb_name=collection_name, urls=st.session_state["url_list"]
)
st.write(res)
st.text('The CSV should have the column named "urls"')
uploaded_file = st.file_uploader("Choose a file")
if uploaded_file is not None:
dataframe = pd.read_csv(uploaded_file)
unique_urls = dataframe["urls"].unique()
unique_urls_list = unique_urls.tolist()
if st.button("add CSV with urls to collection"):
add_links_to_knowledge_base(
client=client,
kb_name=collection_name,
urls=unique_urls_list,
)
# Add PDF
with tab4:
st.header("Add pdf to existing collection")
st.write(
"Trick: if you want to add a Notion page, \
download the page as pdf, and load the pdf here together with the notion url"
)
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf")
pdf_optional_link = st.text_input(
"Insert a URL link you want to associate with the pdf"
)
pdf_title = st.text_input("This title will be displayed as a resource in ask brian")
if st.button("add pdf"):
# Create a temporary file
with NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
# Write the uploaded PDF to the temporary file
tmp_file.write(uploaded_file.getvalue())
tmp_path = tmp_file.name
print("PATH: ", tmp_path)
urls = [tmp_path]
res = add_links_to_knowledge_base(
client=client,
kb_name=collection_name,
urls=urls,
pdf_optional_link=pdf_optional_link,
pdf_title=pdf_title,
)
st.write(res)
# Clean up: delete the temporary file
os.remove(tmp_path)
# Add YOUTUBE
with tab5:
st.header("Add youtube video to collection")
st.image(
"",
width=200, # Manually Adjust the width of the image as per requirement
)
video_url = st.text_input("Youtube video url")
st.text(
"Aggiungere il video puo impiegare un bel pò. Avvia e vatti a fare una sigaretta"
)
if st.button("Add video"):
# Create a temporary file
# Write the uploaded PDF to the temporary file
try:
st.video(video_url)
download_and_transcribe_youtube(video_url)
st.success("Video Added")
except Exception as e:
st.error(f"{str(e)}")
### NOTION WITH JINA AI ###
with tab6:
st.header("Add Notion with JinaAI")
url = st.text_input("Website url")
text = scrape_jina_ai(url=url)
collection_info, coll, client = get_knowledge_base_information(
client=client,
embedding_function=default_embedding_function,
kb_name=collection_name,
)
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size=1_000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
doc_list = text_splitter.create_documents([text])
text_list = [doc.page_content for doc in doc_list]
ids = [str(uuid.uuid4()) for _ in text_list]
if st.button("Click button", key="jina button"):
try:
coll.add(
documents=text_list,
metadatas=[{"source": f"{url}"} for _ in text_list],
ids=ids,
)
st.success("Added")
except Exception as e:
st.error(f"{str(e)}")
### RENAME COLLECTIONS ###
with tab7:
# remove a split
st.header("Rename collection")
new_name = st.text_input("New collection name")
collection_info, coll, client = get_knowledge_base_information(
client=client,
embedding_function=default_embedding_function,
kb_name=collection_name,
)
if st.button("rename"):
try:
coll.modify(
name=new_name,
)
except Exception as e:
st.error(f"{str(e)}")
st.success("Done")
time.sleep(1)
st.experimental_rerun()