Spaces:
Running
Running
"""Page to manage kbs""" | |
from __future__ import unicode_literals # this should always be the first import | |
import streamlit as st | |
import pandas as pd | |
from tempfile import NamedTemporaryFile | |
import os | |
import yt_dlp as youtube_dl | |
from openai import OpenAI | |
import wave | |
from dotenv import load_dotenv | |
import requests | |
import uuid | |
import time | |
from retrieve_kb import get_current_knowledge_bases, get_knowledge_base_information | |
from generate_kb import add_links_to_knowledge_base | |
from app import client, default_embedding_function, show_sidebar | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
load_dotenv() | |
openai_key = os.getenv("OPENAI_API_KEY") | |
show_sidebar() | |
def transcribe_audio(audio_path, openai_key, chunk_length=10000): | |
""" | |
Transcribe audio by breaking it into chunks using wave and numpy. | |
:param audio_path: Path to the audio file (e.g., "video.wav"). | |
:param chunk_length: Length of each audio chunk in milliseconds. | |
:return: Full transcription of the audio file. | |
""" | |
# Open the wave file | |
client = OpenAI(api_key=openai_key) | |
with wave.open(audio_path, "rb") as audio: | |
frame_rate = audio.getframerate() | |
n_channels = audio.getnchannels() | |
sample_width = audio.getsampwidth() | |
# Calculate the number of frames that make up the chunk_length in time | |
num_frames_per_chunk = int(frame_rate * (chunk_length / 1000.0)) | |
# Initialize an empty string to hold the full transcription | |
full_transcription = "" | |
# Read and process each chunk | |
while True: | |
# Read frames for the chunk | |
frames = audio.readframes(num_frames_per_chunk) | |
if not frames: | |
break | |
# Export chunk to a temporary WAV file | |
with wave.open("temp_chunk.wav", "wb") as chunk_audio: | |
chunk_audio.setnchannels(n_channels) | |
chunk_audio.setsampwidth(sample_width) | |
chunk_audio.setframerate(frame_rate) | |
chunk_audio.writeframes(frames) | |
# Open the temporary file and send it to the API | |
with open("temp_chunk.wav", "rb") as audio_file: | |
response = client.audio.transcriptions.create( | |
model="whisper-1", file=audio_file | |
) | |
# Append the transcription to the full transcription | |
full_transcription += response.text + " " | |
full_transcription = full_transcription.strip() | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "The following is the transcription of a youube video made by whisper. \ | |
I want you to adjust the transcription if theres is some error so that I can then insert this transcription to a vector database for retrieval for question answering. Please adkust the text if needed: ", | |
}, | |
{"role": "user", "content": f"{full_transcription}"}, | |
], | |
) | |
text = response.choices[0].message.content | |
return text | |
def download_and_transcribe_youtube(youtube_url): | |
ydl_opts = { | |
"format": "bestaudio/best", | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "wav", | |
"preferredquality": "192", | |
} | |
], | |
"outtmpl": "." + "/video.%(ext)s", | |
} | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([youtube_url]) | |
info_dict = ydl.extract_info(youtube_url, download=True) | |
video_title = info_dict.get("title", None) | |
# audio_file = open("video.wav", "rb") | |
text = transcribe_audio(audio_path="video.wav", openai_key=openai_key) | |
f_out_path = f"{video_title}.txt" | |
with open(f"{video_title}.txt", "w") as f_out: | |
f_out.write(text) | |
urls = [f_out_path] | |
add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=urls, | |
youtube_optional_link=youtube_url, | |
video_title=video_title, | |
) | |
os.remove(f"{video_title}.txt") | |
os.remove("video.wav") | |
os.remove("temp_chunk.wav") | |
if "url_list" not in st.session_state: | |
st.session_state["url_list"] = [] | |
def list_manager(): | |
def add_element(): | |
if len(user_input) > 0: | |
st.session_state["url_list"] += [user_input] | |
else: | |
st.warning("Enter text") | |
st.text("C'è un bug!!! Cliccare su add due volte!") | |
with st.expander("Add urls"): | |
user_input = st.text_input("Enter a url") | |
add_button = st.button("Add", key="add_button") | |
col1, col2 = st.columns((2)) | |
with col1: | |
if add_button: | |
add_element() | |
with col2: | |
if st.button("reset"): | |
st.session_state["url_list"] = [] | |
st.write(st.session_state["url_list"]) | |
def scrape_jina_ai(url: str) -> str: | |
response = requests.get("https://r.jina.ai/" + url) | |
return response.text | |
st.title("Manage collections") | |
kbs = get_current_knowledge_bases(client=client) | |
kbs = sorted(kb.name for kb in kbs) | |
collection_name = st.selectbox("Select knowledge box", kbs) | |
info = {} | |
collection = None | |
if "df" not in st.session_state: | |
st.session_state["df"] = pd.DataFrame() | |
col1, col2 = st.columns(2) | |
if st.button("Get All"): | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
st.session_state["collection"] = coll | |
st.session_state["client"] = client | |
collection = coll | |
df = pd.DataFrame.from_records(collection_info) | |
df["source"] = df["metadatas"].apply(lambda x: x.get("source", "unkown")) | |
df["title"] = df["metadatas"].apply(lambda x: x.get("title", "unkown")) | |
df = df[["documents", "source", "title", "ids"]] | |
st.session_state["df"] = df | |
if len(st.session_state["df"]) != 0: | |
st.dataframe(st.session_state["df"], width=3_000) | |
unique_df = st.session_state["df"]["source"].unique() | |
st.text(f"unique urls: {len(unique_df)}") | |
st.dataframe(unique_df) | |
else: | |
st.warning(f"{collection_name} KB is empty") | |
tab1, tab2, tab3, tab4, tab5, tab6, tab7 = st.tabs( | |
[ | |
"Remove", | |
"Add URL", | |
"Multiple URL", | |
"Add PDF", | |
"Add Youtube", | |
"Notion and Jina", | |
"Rename", | |
] | |
) | |
# remove stuff tab | |
with tab1: | |
# remove a split | |
st.header("Remove a split") | |
id = st.text_input("Insert a split id") | |
if st.button("Remove Id from collection"): | |
try: | |
if id in st.session_state["df"]["ids"].values.tolist(): | |
res = st.session_state["collection"].delete(ids=[f"{id}"]) | |
st.success(f"id {id} deleted") | |
else: | |
st.error(f"id {id} not in kb") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
# REMOVE URL | |
st.header("Remove url from collection") | |
url = st.text_input("remove url") | |
if st.button("Remove url from collection"): | |
try: | |
ids = st.session_state["collection"].get(where={"source": url})["ids"] | |
st.session_state["collection"].delete(ids=ids) | |
st.success("deleted") | |
except Exception as e: | |
st.error(str(e)) | |
# ADD URL | |
with tab2: | |
st.header("Add url to existing collection") | |
url_text = st.text_input( | |
"Insert a url link", | |
help="This should be text stored in a webpage like wikipedia. NB notion pages are not supported yet!", | |
) | |
if st.button("add url to collection"): | |
urls = [url_text] # put in a list even if only one | |
res = add_links_to_knowledge_base( | |
client=client, kb_name=collection_name, urls=urls | |
) | |
st.write(res) | |
# ADD CSV | |
with tab3: | |
list_manager() | |
if st.button("add urls to collection"): | |
res = add_links_to_knowledge_base( | |
client=client, kb_name=collection_name, urls=st.session_state["url_list"] | |
) | |
st.write(res) | |
st.text('The CSV should have the column named "urls"') | |
uploaded_file = st.file_uploader("Choose a file") | |
if uploaded_file is not None: | |
dataframe = pd.read_csv(uploaded_file) | |
unique_urls = dataframe["urls"].unique() | |
unique_urls_list = unique_urls.tolist() | |
if st.button("add CSV with urls to collection"): | |
add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=unique_urls_list, | |
) | |
# Add PDF | |
with tab4: | |
st.header("Add pdf to existing collection") | |
st.write( | |
"Trick: if you want to add a Notion page, \ | |
download the page as pdf, and load the pdf here together with the notion url" | |
) | |
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") | |
pdf_optional_link = st.text_input( | |
"Insert a URL link you want to associate with the pdf" | |
) | |
pdf_title = st.text_input("This title will be displayed as a resource in ask brian") | |
if st.button("add pdf"): | |
# Create a temporary file | |
with NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
# Write the uploaded PDF to the temporary file | |
tmp_file.write(uploaded_file.getvalue()) | |
tmp_path = tmp_file.name | |
print("PATH: ", tmp_path) | |
urls = [tmp_path] | |
res = add_links_to_knowledge_base( | |
client=client, | |
kb_name=collection_name, | |
urls=urls, | |
pdf_optional_link=pdf_optional_link, | |
pdf_title=pdf_title, | |
) | |
st.write(res) | |
# Clean up: delete the temporary file | |
os.remove(tmp_path) | |
# Add YOUTUBE | |
with tab5: | |
st.header("Add youtube video to collection") | |
st.image( | |
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAVsAAACRCAMAAABaFeu5AAAAwFBMVEX////NIB8AAADKAADMGBf//PzNHRzWUVH23d3LCwjjlZUlJSXLDw2np6fi4uLvx8ZycnLPKSfghoY9PT3no6OPj49oaGicnJzZZmXvwsFPT0/66+uvr6/pqanaamr99vbz0tLprq755+fsurrSPDseHh7WVVTu7u5/f3/hjY3fhIPQMzLX19fAwMDbc3K3t7fQ0NB3d3fXW1rUSEguLi5cXFxERETUSUnlnJsUFBSSkpLdeHj119fSNzdeXl41NTXZRimFAAAOiklEQVR4nO2dfXvSPBTGwZRCeTqRjSnIgPIiE3BMQR0K0+//rZ4WaJs7OUlbXtZ2cv/hddmGEH5L05OTk5NC4VWp1HPVdjWoVDaO49RqtVarNdyq76njquH9s/3f7oZbwi3nlt5UKgPvw14lpbR/SkpyCQ4qLrdhv7GqTq6n8+aivBzN6kXTsC0WqThFbNso1uuz0XLc7M6nk8mq0R+6/CsDl/qrwz7YtDqrybpZXj7XbQ6CS8Hl4Mrcqngq7arzKjbcL7C4v4dlzpbl5nqy6rQ2g7SxHCun0Z0FvWkL8WQED8S+Bb7DXe+unLQBHara3P0NRso0NTINt33dWtqYDlB/xuzscvVl2qzeSBtVQrXqLPtgdzKZ2U8bVwL1Frkh68lk5XbayOLKsey0cSWUwVppQ4unfq467U4my8Wou2JpgzpIbJI2uGjlFG0e4LbyitaFm3FzoZ3DsTYQy/Y8eJE3C4GX8ZQ2Pp1q+R0RPLFh2gA1Ghlp4zlKZj1tgGrl+EW2U4Y7bjnf3dYdcUdpI1Spkvdu63bcTdoQFZpYaaM5WvY6bYgKZdgPHlemnc0lNSf/Q4I7KOzXIUog6udGlzihJvy8wVsGjKskv93kP3iG58QfFN6AbuVfe4Ulzgz3mfupxmI6nV7H0zSBdWHOJuEHJ7PTwzWLux9zC+Ru5F/7Bwq8Py/aNj8kJPJ7JBhLjDH/wXPYfKyyrfoLdsue1Gi8/3gUukgNgW0n/gdLSdiW+U+ehe2+V/wCdg9iowX2Zx4Spvy4mV+29nxX92dg91ls9Ae4/fE4dJECX0J+2foD7leA95/Y6J9w++44dFHqAaH8si2y/ZovPvT3Qpu1d0+tzath6+wqx0FB6JnYqwkz4qQavhq2+6YjPsHIQhPtw7HwIrSyqAbGUcbYWtV97fjYY5u/w723R7KLEpgJeWZrd/e1vwN+X/gvvodb34+GF6EF/M4csw2mJw/q5/4RbhEz4tPqGeafOWZrzvx2AUAwYb+pu/Q5hA7GJHPerLEN3IzoMuC/+JNmKD690Lwt2lNvJ0goLDuEe8MEjrAXYFtkvvMAn/yv4fe+hRvSnO3Uagudz2YgmG9XhO0fCX73i7Ct7KvHNxY3qt6poJ9H2rUys1iKXVavF2Hr+PXDxJabIKgHi7NIG/WRM7bBCIb+73BiC5ffnZ3t8BWxDWwcHBQCHy36FyX348nVSYPt01nYBhMzYfr1zb+K/sXzL15Wdevn+WJrXwdfABQ/+Vc/8ld/nB1tYa0zpHLGdh58ARpbvtuAHinOp67uZ74AW9M04my6jLO7lV+T+4/HuPczooOMcN2W7r98/frldD7d8cFsTV5Rl2m2JmPmaPlsM+0sxHBNaaNe9/YUW9rWckFh4MT9s7sG/sWfIoiHb3/9eze3tOl7+/NjoJ9hv397E17+yA81I11v0LHd72reKaSouEyxNdisMfC+oLSZMhU0F/+44bS9Yr12bfXE1O8HPlIU++ju2l/+0hVQK6Ff9w3t2n1HVwAD0CeufPFAtkLIa3AnNOA9Bc5hgi175rbktsv0eMPsFS6DtyfKTm6aXDkAtXXKoGEGrlucr+0lD8hx2PILdNoh9BC2sIe5qmRrsipUUJgTLZEKeeqtFXszICYMPF7bTgheBnDdosM3kBQXkhO2huWILZeNB6NYEQtt1VLAZVwXByfuR5r2XmCa8RKX0xKy7aXD1jbljbii16hoL+WomH1DDHJ4ZnylQMn7GRAUwg0J71VopRdeQraDVNguDYqaEAZsLBVkXbUtqufCXihA9qAmgJM1QeiGTMhWb7KejS25IQw7rllU9VpPG6rdjB9CYHi9Fd5XITOcZUgCYywXbBV77ZoQ4aOPsm8QDQcbBcyC74J/MUSmHGyDD4ZKyJb8+5+drUJ9rn4rapfuUh5y8buB2j2Ov0EhIfROFu8tS8hWHzP+wmy5wd+0dSOCouX43eDEfQSIgWdMWJwkxC9lJmSr3xD5wmy5ddHIbkvtN8REFTAofAbSYWcUSH6+u7vFIFPe75CQrdY1fla2retmtyF0zjBWgqGN5ky6zXkDr/Wlpgs7+G74n/yDBxAUwanxfiUC579cQFlO2A5NZhs2Yzg5CHZeCLufu8wyvNLQL2XTXGD7+41C4YQLMAaP/zu6cFK28h//Rdh29zMrYT9j0BprxV/202aYDPq5NCgIwRXK91ToKYD3XRAKgp6HnLFtKlw7weiPl4OJAroXrkWnjRi4omJLL05y5tYPunQu2IZrkRbQCr4A4iLCr8VapPFMDLjCSNxA4asfX3fhB2GuFkY2JWSrXYp8AbZGl7/uG2FYecjQNFTNIdnimyrQHV2Ac9/AnC4cQfLGFpwG/usJiXNTMOjP0stMChSk2YYPOTjLOIMAmIcQc8aWrsYG65aLvkbTrC44bCS2pGOWc26Bj0HF9pauLgdsDaoaTKhWVbEV16MkthiJuxf37MO4eqX4XDiJyxtbm2Q7JCsR2YrhORJbYVfOTpzrFizg3+F16M9hbNPrYNsiKxHZLqLYCsF1W/GLhTB1UMU7hpOHf4ltM5ItscjIB+L/k2zpSkS2YtyKzBZnWFvxgfgXtkewBX/NVr/4uxe2VRbE6SRmK62GwQrYhe0xbKXlMFgAu7BVppeJwRaD7t4IgfgXtkrFYSs4cTEQ/8JWqThsBScuBuJf2CoVh62QYwXvXdgqFYstWmF478JWqQvbf5Zt2ms6F7Z7ZZ4ttYn+whareR1s04n9SMa2v2iS6oppHcnM2Be2OrZVZtAS45szxlafxf2Me0mSsI2bVpo8FCo9tunEiP4bbNOJbT4XW+oVmB7bdGLyz8XWubA9G1tqO1p6bNPZA3U4WzigWrITqD0q6bFNae/eoWytaqcRqLMU7VvqZM702Ka2n/cwthHxCdT+kwtbrOZAttZZ2B4cs6Q/kyQ1tgfFLMEe/0A6trCYpoq/PZztc47ZCjlL6FPMdGwhekEVIxpGNCRlq03V9QJs6RhRcAwo42+FrADmc4GQji30TxXbsD8nZXtoLqBTsSWrwfhbPm5cF39L7w/UsX2gYamuJ2V7aA4rJVv6cVaxFTaS7avhMn0VwMeMbAUfI0by+9KxhQV21bssXHZXsIVRm2c7OS1bEzMjRLLlcnoVuP0OC/4q508CK6skvIfxL+JLxxZin7kMSbDBJ4yFptmqD5Sg9sofwVbYmR/JlsEmPT+DAo7CYQ5j/3yMncR5j0Ukt9GzxYim8DKkIA4RkGyFMFSerXZRJzFbcVVFwTbc0Yh/i2AGrrC18HQ9cb5OHzuvZQvJQYKe+KCA9Y0ofS9srebZap3jOrZoTDKiFxaUbB0/HZiBr/bg6UdDIbwMDgOx6fRhp1q2mMx1//RjT+SCSomEbm8FtMBW68DV5V5j0Mim7T7I8hq2aj9ve8y8wdIQvILBChMOw96qjinXL74q6BVMLVvhgd5u03tAXFxQ6W+x8L2c7IZnq3WExWc7eGZsIbv41Hv8a02DsScHLwaDv4nJuAutJ4tZZYGdtL2MzHilZSvFld/8FC7wnxB3T3wSywpstY4wLVth8t6mJvPa3BTyJ0KDUHq+e20xX630xJFusAi25A40XrdJCgtsC7rDG7Vs6bxoqIR5P7g5BTnHAs2lbehktmA9W+EQGFl8VtGInEwyW93ZjVq21MKfqGRs+QMjIg+akAYzxaHoEWwV+6l9YcpLugy8EIGtLkmrjq0l2gT7MnHmZYo8P/yic2QyIKndwhTPVwTbgpRBlJeQ2I5MePUIL0Rgq5v06tji6Zq+2rH8CQuaGzTFJusPJM95jCZZMIotJqJA/RJGmSuizJ3an6Bd6dPmFqa8/O16vLga8mj4Hi6AsSnJaiciPxQ95Y1mq06/9ktM4UxsBXzU+Gq0EzM9Wzkis+1arHHYjtkT8d6ZivaqOo/VhsgkSk/LYrBVZbu8kTuPlF7Is37VbHWRNRE5sSWryDLi+RjHhj2TDCa5IWwultlrSCVpVbxcY7ClcwuTJ3BhbX+3HVvNVmfg6nO524JPr+P94rixdoaIokLwsmZUUsbSmmwyuYIuZMJWnfNyL6W2+aM4k4+fW+yNCGALe1p1Rhj6naRAEca7TgaL7V0VW3gzee94k3X5rkt2RfcvMBeJlTom+YYwWYHU1QdedJmCN33lLN1fv9WnHd75U+Lf/mh8/5v7AsxjLtngvBq8JF8vG/kkK+u9+8We8hEEwUqsOeIv7xYMbLZ29h+vjRWpmN1C5U6It1eb2oqSCjMhge4fPnx+//7bh8eIUwjePl5dPcY7alK7nQTiWOQ/gsGMebVRnRfDe3BIF3eOAxUQ45ZdrK/XC6ZMk1/cHUHw1F1Pp+tm3W2D6imTfHBZ0BEnYnjMbItZunlzhAzbtqNPKTG8YrZ2vZ8MtEtd2ibnRaaVNkZS2uXIvOj44fYs0sfl50T0okPqSnJeaWZFxtllQForLB/K6JAQtaEkF4rlTU5FOv94LiSElWVJ+t3oOVAmJw47lY4w/rMgM6tvMk/ayKXsizqKKzsS1/pzJcEVmjXl2lSImRIgNU3yC1e7sJYJNfMK14oVVpKuFvmEmwe03mEhaXM6QGwR/cOyoIbO/Z9JmYwOSsigKs+qZatsyrYy60Yg1JF3d2dWBptneDpGqTNTr/dlSKbNmnGiVDMmZ+qtp2Z4Fc00LFa8VhzomXlV+tPRdvnc5s6LT1veMfb2tlXjqpM2oSPV3gwb6/EsDFGwLNveJebydE6IpsfRWzi3uAAJczTurlf9Wl47LKneoOK0hv3Gqjpddxfj8nI0q2OMB4ff+wPsxCVJ27MK5ZexLEuuxDbrs9FTedGcT6urRn9YcyoDabvDa1ap1Ov12oNKZeM4tVrNZd/vNFz81cnk+vp6ul7Pu82F+4coPy2XI0/L0XL5VB6PF4tmtztfT91Sk8nEhdfp94cttwrH2bgMez0xZftFmdD/D12BwpeL+0AAAAAASUVORK5CYII=", | |
width=200, # Manually Adjust the width of the image as per requirement | |
) | |
video_url = st.text_input("Youtube video url") | |
st.text( | |
"Aggiungere il video puo impiegare un bel pò. Avvia e vatti a fare una sigaretta" | |
) | |
if st.button("Add video"): | |
# Create a temporary file | |
# Write the uploaded PDF to the temporary file | |
try: | |
st.video(video_url) | |
download_and_transcribe_youtube(video_url) | |
st.success("Video Added") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
### NOTION WITH JINA AI ### | |
with tab6: | |
st.header("Add Notion with JinaAI") | |
url = st.text_input("Website url") | |
text = scrape_jina_ai(url=url) | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
text_splitter = RecursiveCharacterTextSplitter( | |
# Set a really small chunk size, just to show. | |
chunk_size=1_000, | |
chunk_overlap=200, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
doc_list = text_splitter.create_documents([text]) | |
text_list = [doc.page_content for doc in doc_list] | |
ids = [str(uuid.uuid4()) for _ in text_list] | |
if st.button("Click button", key="jina button"): | |
try: | |
coll.add( | |
documents=text_list, | |
metadatas=[{"source": f"{url}"} for _ in text_list], | |
ids=ids, | |
) | |
st.success("Added") | |
except Exception as e: | |
st.error(f"{str(e)}") | |
### RENAME COLLECTIONS ### | |
with tab7: | |
# remove a split | |
st.header("Rename collection") | |
new_name = st.text_input("New collection name") | |
collection_info, coll, client = get_knowledge_base_information( | |
client=client, | |
embedding_function=default_embedding_function, | |
kb_name=collection_name, | |
) | |
if st.button("rename"): | |
try: | |
coll.modify( | |
name=new_name, | |
) | |
except Exception as e: | |
st.error(f"{str(e)}") | |
st.success("Done") | |
time.sleep(1) | |
st.experimental_rerun() | |