|
import os |
|
|
|
import tiktoken |
|
from application.vectorstore.vector_creator import VectorCreator |
|
from application.core.settings import settings |
|
from retry import retry |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def num_tokens_from_string(string: str, encoding_name: str) -> int: |
|
|
|
encoding = tiktoken.get_encoding(encoding_name) |
|
num_tokens = len(encoding.encode(string)) |
|
total_price = ((num_tokens / 1000) * 0.0004) |
|
return num_tokens, total_price |
|
|
|
|
|
@retry(tries=10, delay=60) |
|
def store_add_texts_with_retry(store, i): |
|
store.add_texts([i.page_content], metadatas=[i.metadata]) |
|
|
|
|
|
|
|
def call_openai_api(docs, folder_name, task_status): |
|
|
|
|
|
|
|
if not os.path.exists(f"{folder_name}"): |
|
os.makedirs(f"{folder_name}") |
|
|
|
from tqdm import tqdm |
|
c1 = 0 |
|
if settings.VECTOR_STORE == "faiss": |
|
docs_init = [docs[0]] |
|
docs.pop(0) |
|
|
|
store = VectorCreator.create_vectorstore( |
|
settings.VECTOR_STORE, |
|
docs_init = docs_init, |
|
path=f"{folder_name}", |
|
embeddings_key=os.getenv("EMBEDDINGS_KEY") |
|
) |
|
else: |
|
store = VectorCreator.create_vectorstore( |
|
settings.VECTOR_STORE, |
|
path=f"{folder_name}", |
|
embeddings_key=os.getenv("EMBEDDINGS_KEY") |
|
) |
|
|
|
|
|
|
|
|
|
s1 = len(docs) |
|
for i in tqdm(docs, desc="Embedding π¦", unit="docs", total=len(docs), |
|
bar_format='{l_bar}{bar}| Time Left: {remaining}'): |
|
try: |
|
task_status.update_state(state='PROGRESS', meta={'current': int((c1 / s1) * 100)}) |
|
store_add_texts_with_retry(store, i) |
|
except Exception as e: |
|
print(e) |
|
print("Error on ", i) |
|
print("Saving progress") |
|
print(f"stopped at {c1} out of {len(docs)}") |
|
store.save_local(f"{folder_name}") |
|
break |
|
c1 += 1 |
|
if settings.VECTOR_STORE == "faiss": |
|
store.save_local(f"{folder_name}") |
|
|
|
|
|
def get_user_permission(docs, folder_name): |
|
|
|
|
|
|
|
docs_content = "" |
|
for doc in docs: |
|
docs_content += doc.page_content |
|
|
|
tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base") |
|
|
|
print(f"Number of Tokens = {format(tokens, ',d')}") |
|
print(f"Approx Cost = ${format(total_price, ',.2f')}") |
|
|
|
user_input = input("Price Okay? (Y/N) \n").lower() |
|
if user_input == "y": |
|
call_openai_api(docs, folder_name) |
|
elif user_input == "": |
|
call_openai_api(docs, folder_name) |
|
else: |
|
print("The API was not called. No money was spent.") |
|
|