| import logging |
| from pathlib import Path |
|
|
| _logger = logging.getLogger("semantic") |
|
|
| from operator import itemgetter |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.runnables.base import RunnableSequence |
| from langchain_core.vectorstores import VectorStore |
| from langchain.retrievers.multi_query import MultiQueryRetriever |
| from langchain_community.vectorstores import Qdrant |
| from langchain.schema.output_parser import StrOutputParser |
| from langchain.schema.runnable import RunnablePassthrough |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_community.document_loaders import PyMuPDFLoader |
| from langchain_experimental.text_splitter import SemanticChunker |
|
|
| from globals import ( |
| embeddings, |
| gpt35_model, |
| gpt4_model, |
| META_10K_FILE_PATH, |
| META_SEMANTIC_COLLECTION, |
| VECTOR_STORE_PATH |
| ) |
|
|
|
|
| USE_MEMORY = True |
| from qdrant_client import QdrantClient |
|
|
| qclient: QdrantClient |
| if USE_MEMORY == True: |
| qclient = QdrantClient(":memory:") |
| else: |
| qclient = QdrantClient(path=VECTOR_STORE_PATH) |
|
|
|
|
| RAG_PROMPT = """ |
| Reply the user's query thoughtfully and clearly. |
| You should only respond to user's query if the context is related to the query. |
| If you are not sure how to answer, please reply "I don't know". |
| Respond with structure in markdown. |
| |
| CONTEXT: |
| {context} |
| |
| QUERY: |
| {question} |
| |
| YOUR REPLY: """ |
|
|
| rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT) |
|
|
|
|
| class SemanticStoreFactory: |
| _semantic_vectorstore: VectorStore = None |
|
|
| @classmethod |
| def __load_semantic_store( |
| cls |
| ) -> VectorStore: |
| path = Path(VECTOR_STORE_PATH) |
| store = None |
| |
| if path.exists() and path.is_dir() and any(path.iterdir()): |
| _logger.info(f"\tQdrant loading ...") |
| store = Qdrant( |
| client=qclient, |
| embeddings=embeddings, |
| collection_name=META_SEMANTIC_COLLECTION, |
| ) |
| else: |
| _logger.info(f"\tQdrant creating ...") |
| store = cls.__create_semantic_store() |
| return store |
|
|
| @classmethod |
| def __create_semantic_store( |
| cls |
| ) -> VectorStore: |
| |
| if USE_MEMORY == True: |
| _logger.info(f"creating semantic vector store: {USE_MEMORY}") |
| else: |
| _logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}") |
| path = Path(VECTOR_STORE_PATH) |
| if not path.exists(): |
| path.mkdir(parents=True, exist_ok=True) |
| _logger.info(f"Directory '{path}' created.") |
|
|
| _logger.info(f"loading {META_10K_FILE_PATH}") |
| documents = PyMuPDFLoader(META_10K_FILE_PATH).load() |
| _logger.info(f"\tload { len(documents) } docs") |
| semantic_chunker = SemanticChunker( |
| embeddings=embeddings, |
| breakpoint_threshold_type="percentile" |
| ) |
| semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents]) |
| _logger.info(f"created semantic_chunks: {len(semantic_chunks)}") |
| if USE_MEMORY == True: |
| _logger.info(f"\t==> creating memory vectorstore ...") |
| semantic_chunk_vectorstore = Qdrant.from_documents( |
| semantic_chunks, |
| embeddings, |
| location=":memory:", |
| collection_name=META_SEMANTIC_COLLECTION, |
| force_recreate=True |
| ) |
| _logger.info(f"\t==> finished constructing vectorstore") |
| else: |
| semantic_chunk_vectorstore = Qdrant.from_documents( |
| semantic_chunks, |
| embeddings, |
| path=VECTOR_STORE_PATH, |
| collection_name=META_SEMANTIC_COLLECTION, |
| force_recreate=True |
| ) |
| _logger.info(f"\t==> return vectorstore") |
|
|
| return semantic_chunk_vectorstore |
|
|
| @classmethod |
| def get_semantic_store( |
| cls |
| ) -> VectorStore: |
| _logger.info(f"get_semantic_store") |
| if cls._semantic_vectorstore is None: |
| if USE_MEMORY == True: |
| cls._semantic_vectorstore = cls.__create_semantic_store() |
| _logger.info(f"received semantic_vectorstore") |
| else: |
| print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}") |
| try: |
| |
| cls._semantic_vectorstore = cls.__load_semantic_store() |
| except Exception as e: |
| _logger.warning(f"cannot load: {e}") |
| cls._semantic_vectorstore = cls.__create_semantic_store() |
|
|
| _logger.info(f"RETURNING get_semantic_store") |
| return cls._semantic_vectorstore |
|
|
| class SemanticRAGChainFactory: |
| _chain: RunnableSequence = None |
|
|
| @classmethod |
| def get_semantic_rag_chain( |
| cls |
| ) -> RunnableSequence: |
| if cls._chain is None: |
| _logger.info(f"creating SemanticRAGChainFactory") |
| semantic_store = SemanticStoreFactory.get_semantic_store() |
| _logger.info(f"\treceived semantic_store") |
| if semantic_store is not None: |
| semantic_chunk_retriever = semantic_store.as_retriever() |
| semantic_mquery_retriever = MultiQueryRetriever.from_llm( |
| retriever=semantic_chunk_retriever, |
| llm=gpt4_model |
| ) |
| cls._chain = ( |
| |
| |
| |
| {"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")} |
| |
| |
| | RunnablePassthrough.assign(context=itemgetter("context")) |
| |
| |
| |
| | {"response": rag_prompt | gpt4_model, "context": itemgetter("context")} |
| ) |
| _logger.info(f"\t_chain constructed") |
|
|
| return cls._chain |