| | import os |
| |
|
| | import tiktoken |
| | from application.vectorstore.vector_creator import VectorCreator |
| | from application.core.settings import settings |
| | from retry import retry |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def num_tokens_from_string(string: str, encoding_name: str) -> int: |
| | |
| | encoding = tiktoken.get_encoding(encoding_name) |
| | num_tokens = len(encoding.encode(string)) |
| | total_price = ((num_tokens / 1000) * 0.0004) |
| | return num_tokens, total_price |
| |
|
| |
|
| | @retry(tries=10, delay=60) |
| | def store_add_texts_with_retry(store, i): |
| | store.add_texts([i.page_content], metadatas=[i.metadata]) |
| | |
| |
|
| |
|
| | def call_openai_api(docs, folder_name, task_status): |
| | |
| |
|
| | |
| | if not os.path.exists(f"{folder_name}"): |
| | os.makedirs(f"{folder_name}") |
| |
|
| | from tqdm import tqdm |
| | c1 = 0 |
| | if settings.VECTOR_STORE == "faiss": |
| | docs_init = [docs[0]] |
| | docs.pop(0) |
| |
|
| | store = VectorCreator.create_vectorstore( |
| | settings.VECTOR_STORE, |
| | docs_init = docs_init, |
| | path=f"{folder_name}", |
| | embeddings_key=os.getenv("EMBEDDINGS_KEY") |
| | ) |
| | else: |
| | store = VectorCreator.create_vectorstore( |
| | settings.VECTOR_STORE, |
| | path=f"{folder_name}", |
| | embeddings_key=os.getenv("EMBEDDINGS_KEY") |
| | ) |
| | |
| | |
| | |
| | |
| | s1 = len(docs) |
| | for i in tqdm(docs, desc="Embedding 🦖", unit="docs", total=len(docs), |
| | bar_format='{l_bar}{bar}| Time Left: {remaining}'): |
| | try: |
| | task_status.update_state(state='PROGRESS', meta={'current': int((c1 / s1) * 100)}) |
| | store_add_texts_with_retry(store, i) |
| | except Exception as e: |
| | print(e) |
| | print("Error on ", i) |
| | print("Saving progress") |
| | print(f"stopped at {c1} out of {len(docs)}") |
| | store.save_local(f"{folder_name}") |
| | break |
| | c1 += 1 |
| | if settings.VECTOR_STORE == "faiss": |
| | store.save_local(f"{folder_name}") |
| |
|
| |
|
| | def get_user_permission(docs, folder_name): |
| | |
| | |
| | |
| | docs_content = "" |
| | for doc in docs: |
| | docs_content += doc.page_content |
| |
|
| | tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base") |
| | |
| | print(f"Number of Tokens = {format(tokens, ',d')}") |
| | print(f"Approx Cost = ${format(total_price, ',.2f')}") |
| | |
| | user_input = input("Price Okay? (Y/N) \n").lower() |
| | if user_input == "y": |
| | call_openai_api(docs, folder_name) |
| | elif user_input == "": |
| | call_openai_api(docs, folder_name) |
| | else: |
| | print("The API was not called. No money was spent.") |
| |
|