from __future__ import unicode_literals import streamlit as st from retrieve_kb import get_current_knowledge_bases, get_knowledge_base_information from generate_kb import add_links_to_knowledge_base from app import client, default_embedding_function import pandas as pd from tempfile import NamedTemporaryFile import os import yt_dlp as youtube_dl import requests from transformers import pipeline from openai import OpenAI import wave from dotenv import load_dotenv load_dotenv() openai_key = os.getenv("OPENAI_API_KEY") openai_key = st.secrets["OPENAI_API_KEY"] st.title("Get knowledge boxes") if st.button("Get current knowledge bases"): kbs = get_current_knowledge_bases(client=client) st.json(kbs) collection_name = st.text_input(label="knowledge base name") info = {} collection = None if "df" not in st.session_state: st.session_state["df"] = pd.DataFrame() col1, col2 = st.columns(2) if st.button("Get All"): collection_info, coll, client = get_knowledge_base_information( client=client, embedding_function=default_embedding_function, kb_name=collection_name, ) st.session_state["collection"] = coll st.session_state["client"] = client collection = coll # st.write(collection_info) df = pd.DataFrame.from_records(collection_info) df["source"] = df["metadatas"].apply(lambda x: x.get("source", "unkown")) df["title"] = df["metadatas"].apply(lambda x: x.get("title", "unkown")) df = df[["documents", "source", "title", "ids"]] st.session_state["df"] = df if len(st.session_state["df"]) != 0: st.dataframe(st.session_state["df"], width=3_000) unique_df = st.session_state["df"]["source"].unique() st.text(f"unique urls: {len(unique_df)}") st.dataframe(unique_df) ############################# #### REMOVE A SPLIT ######### ############################# st.header("Remove a split") id = st.text_input("Insert a split id") if st.button("Remove Id from collection"): if id in st.session_state["df"]["ids"].values.tolist(): res = st.session_state["collection"].delete(ids=[f"{id}"]) st.success(f"id {id} deleted") else: st.error(f"id {id} not in kb") ############################# #### REMOVE URL ############ ############################# st.header("Remove url from collection") url = st.text_input("remove url") if st.button("Remove url from collection"): try: ids = st.session_state["collection"].get(where={"source": url})["ids"] st.session_state["collection"].delete(ids=ids) st.success("deleted") except Exception as e: st.error(str(e)) ############################# ########### ADD URL ######### ############################# st.header("Add url to existing collection") url_text = st.text_input("Insert a url link") if st.button("add url to collection"): urls = [url_text] # put in a list even if only one res = add_links_to_knowledge_base(client=client, kb_name=collection_name, urls=urls) st.write(res) st.header("Add pdf to existing collection") uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") pdf_optional_link = st.text_input( "Insert a URL link you want to associate with the pdf" ) pdf_title = st.text_input("This title will be displayed as a resource in ask brian") if st.button("add pdf"): # Create a temporary file with NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: # Write the uploaded PDF to the temporary file tmp_file.write(uploaded_file.getvalue()) tmp_path = tmp_file.name print("PATH: ", tmp_path) urls = [tmp_path] res = add_links_to_knowledge_base( client=client, kb_name=collection_name, urls=urls, pdf_optional_link=pdf_optional_link, pdf_title=pdf_title, ) st.write(res) # Clean up: delete the temporary file os.remove(tmp_path) ############################# ########### ADD CSV ######### ############################# st.header("Add csv to existing collection") uploaded_file = st.file_uploader("Choose a CSV file", type=["csv"]) df = None if uploaded_file is not None: try: new_df = pd.read_csv(uploaded_file) st.write("DataFrame:") st.write(new_df) except Exception as e: st.error(str(e)) if st.button("add csv urls to collection"): urls = new_df.values.tolist() st.write(urls) res = add_links_to_knowledge_base( client=client, kb_name=collection_name, urls=urls ) st.write(res) ############################# ########## YOUTUBE ########## ############################# def transcribe_audio(audio_path, chunk_length=10000): """ Transcribe audio by breaking it into chunks using wave and numpy. :param audio_path: Path to the audio file (e.g., "video.wav"). :param chunk_length: Length of each audio chunk in milliseconds. :return: Full transcription of the audio file. """ # Open the wave file client = OpenAI(api_key=open_ai_key) with wave.open(audio_path, "rb") as audio: frame_rate = audio.getframerate() n_channels = audio.getnchannels() sample_width = audio.getsampwidth() # Calculate the number of frames that make up the chunk_length in time num_frames_per_chunk = int(frame_rate * (chunk_length / 1000.0)) # Initialize an empty string to hold the full transcription full_transcription = "" # Read and process each chunk while True: # Read frames for the chunk frames = audio.readframes(num_frames_per_chunk) if not frames: break # Export chunk to a temporary WAV file with wave.open("temp_chunk.wav", "wb") as chunk_audio: chunk_audio.setnchannels(n_channels) chunk_audio.setsampwidth(sample_width) chunk_audio.setframerate(frame_rate) chunk_audio.writeframes(frames) # Open the temporary file and send it to the API with open("temp_chunk.wav", "rb") as audio_file: response = client.audio.transcriptions.create( model="whisper-1", file=audio_file ) # Append the transcription to the full transcription full_transcription += response.text + " " full_transcription = full_transcription.strip() response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": "The following is the transcription of a youube video made by whisper. \ I want you to adjust the transcription if theres is some error so that I can then insert this transcription to a vector database for retrieval for question answering. Please adkust the text if needed: ", }, {"role": "user", "content": f"{full_transcription}"}, ], ) text = response.choices[0].message.content return text def download_and_transcribe_youtube(youtube_url): ydl_opts = { "format": "bestaudio/best", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "wav", "preferredquality": "192", } ], "outtmpl": "." + "/video.%(ext)s", } with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) info_dict = ydl.extract_info(youtube_url, download=True) video_title = info_dict.get("title", None) # audio_file = open("video.wav", "rb") text = transcribe_audio("video.wav") f_out_path = f"{video_title}.txt" with open(f"{video_title}.txt", "w") as f_out: f_out.write(text) urls = [f_out_path] add_links_to_knowledge_base( client=client, kb_name=collection_name, urls=urls, youtube_optional_link=youtube_url, video_title=video_title, ) os.remove(f"{video_title}.txt") os.remove("video.wav") os.remove("temp_chunk.wav") st.header("Add youtube video to collection") st.image( "", width=200, # Manually Adjust the width of the image as per requirement ) video_url = st.text_input("Youtube video url") st.text("Aggiungere il video puo impiegare un bel pò. Avvia e vatti a fare una canna") if st.button("Add video"): # Create a temporary file # Write the uploaded PDF to the temporary file try: download_and_transcribe_youtube(video_url) st.success("Video Added") except Exception as e: st.error(f"{str(e)}")