| print("start1") |
| import os |
| import sys |
| import subprocess |
| import gradio as gr |
| from PyPDF2 import PdfReader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_community.vectorstores import FAISS |
| from langchain.prompts import PromptTemplate |
| from langchain.chains import LLMChain |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain.schema import Document |
| print("start2") |
|
|
|
|
| # Check if already installed to avoid reinstalling |
| try: |
| import llama_cpp |
| print("llama_cpp already installed.") |
| except ImportError: |
| print("Installing llama_cpp from wheel...") |
| subprocess.check_call([ |
| sys.executable, "-m", "pip", "install", |
| "llama-cpp-python", "--no-binary", ":all:", "--force-reinstall" |
| ]) |
|
|
|
|
| from llama_cpp import Llama |
| print("start3") |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| print("Start") |
| import subprocess |
|
|
| subprocess.run([ |
| "huggingface-cli", "download", |
| "TheBloke/Mistral-7B-Instruct-v0.1-GGUF", |
| "mistral-7b-instruct-v0.1.Q2_K.gguf", |
| "--local-dir", "./models", |
| "--local-dir-use-symlinks", "False" |
| ], check=True) |
|
|
| # ------------------------------ |
| # Device and Embedding Setup (CPU optimized) |
| # ------------------------------ |
| modelPath = "sentence-transformers/all-mpnet-base-v2" |
| model_kwargs = {"device": "cpu"} # Force CPU usage |
| encode_kwargs = {"normalize_embedding": False} |
|
|
| embeddings = HuggingFaceEmbeddings( |
| model_name=modelPath, |
| model_kwargs=model_kwargs, |
| encode_kwargs=encode_kwargs |
| ) |
|
|
| # ------------------------------ |
| # Load Mistral GGUF via llama.cpp (CPU optimized) |
| # ------------------------------ |
| llm_cpp = Llama( |
| model_path="./models/mistral-7b-instruct-v0.1.Q2_K.gguf", |
| n_ctx=2048, |
| n_threads=4, # Adjust based on your CPU cores |
| n_gpu_layers=0, # Force CPU-only |
| temperature=0.7, |
| top_p=0.9, |
| repeat_penalty=1.1 |
| ) |
|
|
| # ------------------------------ |
| # LangChain-compatible wrapper |
| # ------------------------------ |
| def mistral_llm(prompt): |
| output = llm_cpp( |
| prompt, |
| max_tokens=512, # Reduced for CPU performance |
| stop=["</s>", "[INST]", "[/INST]"] |
| ) |
| return output["choices"][0]["text"].strip() |
|
|
| # ------------------------------ |
| # Prompt Template (unchanged) |
| # ------------------------------ |
| def get_qa_prompt(): |
| template = """<s>[INST] \ |
| You are a helpful, knowledgeable AI assistant. Answer the user's question based on the provided context. |
|
|
| Guidelines: |
| - Respond in a natural, conversational tone |
| - Be detailed but concise |
| - Use paragraphs and bullet points when appropriate |
| - If you don't know, say so |
| - Maintain a friendly and professional demeanor |
|
|
| Conversation History: |
| {chat_history} |
|
|
| Relevant Context: |
| {context} |
|
|
| Current Question: {question} |
|
|
| Provide a helpful response: [/INST]""" |
| return PromptTemplate( |
| template=template, |
| input_variables=["context", "question", "chat_history"] |
| ) |
|
|
| # ------------------------------ |
| # PDF and Chat Logic (optimized for CPU) |
| # ------------------------------ |
| def pdf_text(pdf_docs): |
| text = "" |
| for doc in pdf_docs: |
| reader = PdfReader(doc) |
| for page in reader.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| return text |
|
|
| def get_chunks(text): |
| splitter = RecursiveCharacterTextSplitter( |
| chunk_size=800, # Smaller chunks for CPU |
| chunk_overlap=100, |
| length_function=len |
| ) |
| chunks = splitter.split_text(text) |
| return [Document(page_content=chunk) for chunk in chunks] |
|
|
| def get_vectorstore(documents): |
| db = FAISS.from_documents(documents, embedding=embeddings) |
| db.save_local("faiss_index") |
|
|
| def format_chat_history(history): |
| return "\n".join([f"User: {q}\nAssistant: {a}" for q, a in history[-2:]]) # Shorter history |
|
|
| def handle_pdf_upload(pdf_files): |
| if not pdf_files: |
| return "⚠️ Upload at least one PDF" |
| try: |
| text = pdf_text(pdf_files) |
| if not text.strip(): |
| return "⚠️ Could not extract text" |
| chunks = get_chunks(text) |
| get_vectorstore(chunks) |
| return f"✅ Processed {len(pdf_files)} PDF(s) with {len(chunks)} chunks" |
| except Exception as e: |
| return f"❌ Error: {str(e)}" |
|
|
| def user_query(msg, chat_history): |
| if not os.path.exists("faiss_index"): |
| chat_history.append((msg, "Please upload PDF documents first.")) |
| return "", chat_history |
|
|
| try: |
| db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) |
| retriever = db.as_retriever(search_kwargs={"k": 2}) # Fewer documents for CPU |
| docs = retriever.get_relevant_documents(msg) |
| context = "\n\n".join([d.page_content for d in docs][:2]) # Limit context |
|
|
| prompt = get_qa_prompt() |
| final_prompt = prompt.format( |
| context=context[:1500], # Further limit context size |
| question=msg, |
| chat_history=format_chat_history(chat_history) |
| ) |
|
|
| response = mistral_llm(final_prompt) |
| chat_history.append((msg, response)) |
| return "", chat_history |
| except Exception as e: |
| error_msg = f"Sorry, I encountered an error: {str(e)}" |
| chat_history.append((msg, error_msg)) |
| return "", chat_history |
|
|
| # ------------------------------ |
| # Gradio Interface (your exact requested format) |
| # ------------------------------ |
| with gr.Blocks(theme=gr.themes.Soft(), title="PDF Chat Assistant") as demo: |
| with gr.Row(): |
| gr.Markdown(""" |
| # 📚 PDF Chat Assistant |
| ### Have natural conversations with your documents ((Note: This Space runs on CPU, so responses may take a few mins.)) |
| """) |
| with gr.Row(): |
| with gr.Column(scale=1, min_width=300): |
| gr.Markdown("### Document Upload") |
| pdf_input = gr.File( |
| file_types=[".pdf"], |
| file_count="multiple", |
| label="Upload PDFs", |
| height=100 |
| ) |
| upload_btn = gr.Button("Process Documents", variant="primary") |
| status_box = gr.Textbox(label="Status", interactive=False) |
| gr.Markdown(""" |
| **Instructions:** |
| 1. Upload PDF documents |
| 2. Click Process Documents |
| 3. Start chatting in the right panel |
| """) |
|
|
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot( |
| height=600, |
| bubble_full_width=False, |
| avatar_images=( |
| "user.png", |
| "bot.png" |
| ) |
| ) |
| |
| with gr.Row(): |
| message = gr.Textbox( |
| placeholder="Type your question about the documents...", |
| show_label=False, |
| container=False, |
| scale=7, |
| autofocus=True |
| ) |
| submit_btn = gr.Button("Send", variant="primary", scale=1) |
| |
| with gr.Row(): |
| clear_chat = gr.Button("🧹 Clear Conversation") |
| examples = gr.Examples( |
| examples=[ |
| "Summarize the key points from the documents", |
| "What are the main findings?", |
| "Explain this in simpler terms" |
| ], |
| inputs=message, |
| label="Example Questions" |
| ) |
|
|
| upload_btn.click(handle_pdf_upload, inputs=pdf_input, outputs=status_box) |
| submit_btn.click(user_query, inputs=[message, chatbot], outputs=[message, chatbot]) |
| message.submit(user_query, inputs=[message, chatbot], outputs=[message, chatbot]) |
| clear_chat.click(lambda: [], None, chatbot, queue=False) |
|
|
| if __name__ == "__main__": |
| demo.launch() # Disable sharing for local CPU use |