Ab-Romia commited on
Commit
2d010fd
Β·
verified Β·
1 Parent(s): 4f57074

Update app/services.py

Browse files
Files changed (1) hide show
  1. app/services.py +122 -29
app/services.py CHANGED
@@ -5,6 +5,7 @@ import textwrap
5
  import time
6
  import rag_setup
7
  from schemas import ChatRequest, DocumentRequest, TaskRequest
 
8
  logging.basicConfig(
9
  level=logging.INFO,
10
  format='%(asctime)s [%(levelname)s] %(message)s',
@@ -20,14 +21,24 @@ CACHE_EXPIRATION_SECONDS = 600 # 10 minutes
20
 
21
 
22
  def index_document(request_data: DocumentRequest) -> int:
23
- logger.info("Starting document indexing process.")
 
 
 
 
 
 
 
 
24
 
25
  try:
26
  # Step 1: Clear any existing documents properly
27
  existing_ids = rag_setup.collection.get()["ids"]
28
  if existing_ids:
29
  rag_setup.collection.delete(ids=existing_ids)
30
- logger.info("Cleared existing documents from vector collection.")
 
 
31
 
32
  # Step 2: Chunk document
33
  text_chunks = textwrap.wrap(
@@ -38,23 +49,37 @@ def index_document(request_data: DocumentRequest) -> int:
38
  )
39
 
40
  if not text_chunks:
41
- logger.warning("No text chunks were generated.")
42
  return 0
43
 
 
 
 
 
 
 
 
44
  # Step 3: Add chunks to ChromaDB
45
  chunk_ids = [f"doc_chunk_{i}_{int(time.time())}" for i in range(len(text_chunks))]
46
- logger.info(f"Attempting to add {len(chunk_ids)} chunks to ChromaDB...")
 
47
  rag_setup.collection.add(documents=text_chunks, ids=chunk_ids)
48
-
 
 
 
 
49
  return len(text_chunks)
50
  except Exception as e:
51
- logger.error(f"Error during indexing: {str(e)}", exc_info=True)
52
  raise
53
 
 
54
  def clear_index():
55
  """Clears all documents from the vector database."""
 
56
  rag_setup.collection.delete(where={})
57
- logger.info("Successfully cleared the vector index.")
58
 
59
 
60
  async def get_rag_response(request_data: ChatRequest) -> str:
@@ -62,31 +87,52 @@ async def get_rag_response(request_data: ChatRequest) -> str:
62
  Performs the RAG pipeline: checks cache, retrieves context, generates a response.
63
  """
64
  start_total = time.time()
65
- logger.info(f"Processing query: '{request_data.prompt}'")
 
 
 
 
 
 
66
 
67
  try:
68
  # Step 1: Check cache for a recent, identical query
69
  cached_response = _get_cached_response(request_data.prompt)
70
  if cached_response:
71
- logger.info("Cache hit! Returning cached response.")
 
72
  return f"{cached_response}\n\n(This response was retrieved from cache)"
73
 
74
- logger.info("Cache miss. Proceeding with RAG pipeline.")
75
 
76
  # Step 2: Check if the vector database has any content
77
- if rag_setup.collection.count() == 0:
78
- logger.warning("Vector DB is empty. Cannot answer query.")
 
 
 
79
  return "The knowledge base is empty. Please provide some context in the left panel and click 'Index Context' before asking questions."
80
 
81
  # Step 3: Retrieve relevant chunks from ChromaDB
82
- logger.info("Retrieving relevant chunks from vector DB...")
83
  retrieved_chunks = await _retrieve_chunks_async(request_data.prompt)
84
 
85
  if not retrieved_chunks or not retrieved_chunks.get('documents') or not retrieved_chunks['documents'][0]:
86
- logger.warning("No relevant chunks found in the vector DB for this query.")
87
  return "I could not find any relevant information in the provided context to answer your question."
88
 
89
- context_for_prompt = "\n\n---\n\n".join(retrieved_chunks['documents'][0])
 
 
 
 
 
 
 
 
 
 
 
90
 
91
  # Step 4: Construct the final prompt for the LLM
92
  full_prompt = (
@@ -98,23 +144,36 @@ async def get_rag_response(request_data: ChatRequest) -> str:
98
  "--- CONTEXT END ---\n\n"
99
  f'User\'s Question: "{request_data.prompt}"'
100
  )
 
 
 
 
 
101
 
102
  # Step 5: Generate the response using the LLM
103
- logger.info("Generating response from OpenRouter...")
104
  response_text = await _generate_response_async(full_prompt)
 
 
 
 
105
 
106
  # Step 6: Cache the newly generated response
107
  _cache_response(request_data.prompt, response_text)
 
108
 
109
  total_time = time.time() - start_total
110
- logger.info(f"Total processing time: {total_time:.2f}s")
 
 
 
111
  return response_text
112
 
113
  except asyncio.TimeoutError:
114
- logger.error("Request timed out during retrieval or generation.")
115
  return "The request timed out. Please try again or simplify your question."
116
  except Exception as e:
117
- logger.error(f"An unexpected error occurred: {e}", exc_info=True)
118
  return f"An unexpected error occurred: {e}"
119
 
120
 
@@ -123,12 +182,23 @@ async def execute_task(request_data: TaskRequest) -> str:
123
  Executes a specific task on the given context.
124
  """
125
  start_total = time.time()
126
- logger.info(f"Executing task '{request_data.task_type}' with prompt: '{request_data.prompt}'")
 
 
 
 
 
 
 
 
 
 
127
 
128
  try:
129
  # For tasks, we use the full context, not just retrieved chunks
130
  context = request_data.context
131
  if not context:
 
132
  return "Context is empty. Please provide some text in the 'Knowledge Base' to perform a task."
133
 
134
  # Construct the prompt based on the task type
@@ -139,57 +209,80 @@ async def execute_task(request_data: TaskRequest) -> str:
139
  elif request_data.task_type == "creative":
140
  full_prompt = f"Use the following text as inspiration to write a creative piece (e.g., a poem, a short story, a metaphor). The user's prompt can guide the style or topic.\n\n--- INSPIRATION ---\n{context}\n\n--- PROMPT ---\n{request_data.prompt or 'Write a short poem'}"
141
  else:
 
142
  return "Invalid task type specified."
143
 
 
 
 
 
 
144
  # Generate the response
145
- logger.info("Generating task-based response from OpenRouter...")
146
  response_text = await _generate_response_async(full_prompt)
 
 
 
 
147
 
148
  total_time = time.time() - start_total
149
- logger.info(f"Task execution time: {total_time:.2f}s")
 
 
 
150
  return response_text
151
 
152
  except asyncio.TimeoutError:
153
- logger.error("Request timed out during task execution.")
154
  return "The request timed out. Please try again."
155
  except Exception as e:
156
- logger.error(f"An unexpected error occurred during task execution: {e}", exc_info=True)
157
  return f"An unexpected error occurred: {e}"
158
 
159
  # --- ASYNC WRAPPERS & CACHE HELPERS ---
160
 
161
  async def _retrieve_chunks_async(prompt: str):
162
  """Asynchronously queries the ChromaDB collection."""
 
163
  loop = asyncio.get_event_loop()
164
- return await loop.run_in_executor(
165
  None,
166
  functools.partial(rag_setup.collection.query, query_texts=[prompt], n_results=3)
167
  )
 
 
168
 
169
 
170
  async def _generate_response_async(full_prompt: str):
171
  """Asynchronously calls the LLM to generate content."""
172
-
173
-
 
174
  loop = asyncio.get_event_loop()
175
- return await loop.run_in_executor(
176
  None,
177
  rag_setup.generation_model.generate_content,
178
  full_prompt
179
  )
 
 
 
180
 
181
  def _get_cached_response(key: str):
182
  """Checks the cache for a valid (non-expired) entry."""
183
  if key in _response_cache:
184
  timestamp, response = _response_cache[key]
185
  if time.time() - timestamp < CACHE_EXPIRATION_SECONDS:
 
186
  return response
187
  else:
188
  # Expired, remove from cache
189
  del _response_cache[key]
 
190
  return None
191
 
192
 
193
  def _cache_response(key: str, response: str):
194
  """Adds a response to the cache with the current timestamp."""
195
- _response_cache[key] = (time.time(), response)
 
 
5
  import time
6
  import rag_setup
7
  from schemas import ChatRequest, DocumentRequest, TaskRequest
8
+
9
  logging.basicConfig(
10
  level=logging.INFO,
11
  format='%(asctime)s [%(levelname)s] %(message)s',
 
21
 
22
 
23
  def index_document(request_data: DocumentRequest) -> int:
24
+ logger.info("=" * 80)
25
+ logger.info("πŸ“š STARTING DOCUMENT INDEXING PROCESS")
26
+ logger.info("=" * 80)
27
+
28
+ # Log the incoming context
29
+ context_preview = request_data.context[:200] + "..." if len(request_data.context) > 200 else request_data.context
30
+ logger.info(f"πŸ“ CONTEXT TO INDEX (length: {len(request_data.context)} chars):")
31
+ logger.info(f" Preview: {context_preview}")
32
+ logger.info("-" * 60)
33
 
34
  try:
35
  # Step 1: Clear any existing documents properly
36
  existing_ids = rag_setup.collection.get()["ids"]
37
  if existing_ids:
38
  rag_setup.collection.delete(ids=existing_ids)
39
+ logger.info(f"πŸ—‘οΈ Cleared {len(existing_ids)} existing documents from vector collection.")
40
+ else:
41
+ logger.info("πŸ“‚ No existing documents to clear.")
42
 
43
  # Step 2: Chunk document
44
  text_chunks = textwrap.wrap(
 
49
  )
50
 
51
  if not text_chunks:
52
+ logger.warning("⚠️ No text chunks were generated.")
53
  return 0
54
 
55
+ logger.info(f"βœ‚οΈ Document split into {len(text_chunks)} chunks")
56
+
57
+ # Log each chunk for debugging
58
+ for i, chunk in enumerate(text_chunks):
59
+ chunk_preview = chunk[:100] + "..." if len(chunk) > 100 else chunk
60
+ logger.info(f" Chunk {i+1}: {chunk_preview} (length: {len(chunk)} chars)")
61
+
62
  # Step 3: Add chunks to ChromaDB
63
  chunk_ids = [f"doc_chunk_{i}_{int(time.time())}" for i in range(len(text_chunks))]
64
+ logger.info(f"πŸ’Ύ Adding {len(chunk_ids)} chunks to ChromaDB...")
65
+
66
  rag_setup.collection.add(documents=text_chunks, ids=chunk_ids)
67
+
68
+ logger.info("βœ… DOCUMENT INDEXING COMPLETED SUCCESSFULLY")
69
+ logger.info(f"πŸ“Š Total chunks indexed: {len(text_chunks)}")
70
+ logger.info("=" * 80)
71
+
72
  return len(text_chunks)
73
  except Exception as e:
74
+ logger.error(f"❌ Error during indexing: {str(e)}", exc_info=True)
75
  raise
76
 
77
+
78
  def clear_index():
79
  """Clears all documents from the vector database."""
80
+ logger.info("πŸ—‘οΈ Clearing vector index...")
81
  rag_setup.collection.delete(where={})
82
+ logger.info("βœ… Successfully cleared the vector index.")
83
 
84
 
85
  async def get_rag_response(request_data: ChatRequest) -> str:
 
87
  Performs the RAG pipeline: checks cache, retrieves context, generates a response.
88
  """
89
  start_total = time.time()
90
+
91
+ logger.info("=" * 80)
92
+ logger.info("πŸ€– STARTING RAG PIPELINE")
93
+ logger.info("=" * 80)
94
+ logger.info(f"❓ USER PROMPT: '{request_data.prompt}'")
95
+ logger.info(f"πŸ“ Prompt length: {len(request_data.prompt)} characters")
96
+ logger.info("-" * 60)
97
 
98
  try:
99
  # Step 1: Check cache for a recent, identical query
100
  cached_response = _get_cached_response(request_data.prompt)
101
  if cached_response:
102
+ logger.info("πŸ’Ύ CACHE HIT! Returning cached response.")
103
+ logger.info(f"πŸ“€ CACHED RESPONSE: {cached_response[:200]}...")
104
  return f"{cached_response}\n\n(This response was retrieved from cache)"
105
 
106
+ logger.info("πŸ” Cache miss. Proceeding with RAG pipeline.")
107
 
108
  # Step 2: Check if the vector database has any content
109
+ doc_count = rag_setup.collection.count()
110
+ logger.info(f"πŸ“š Vector DB contains {doc_count} documents")
111
+
112
+ if doc_count == 0:
113
+ logger.warning("⚠️ Vector DB is empty. Cannot answer query.")
114
  return "The knowledge base is empty. Please provide some context in the left panel and click 'Index Context' before asking questions."
115
 
116
  # Step 3: Retrieve relevant chunks from ChromaDB
117
+ logger.info("πŸ”Ž Retrieving relevant chunks from vector DB...")
118
  retrieved_chunks = await _retrieve_chunks_async(request_data.prompt)
119
 
120
  if not retrieved_chunks or not retrieved_chunks.get('documents') or not retrieved_chunks['documents'][0]:
121
+ logger.warning("❌ No relevant chunks found in the vector DB for this query.")
122
  return "I could not find any relevant information in the provided context to answer your question."
123
 
124
+ # Log retrieved chunks
125
+ chunks = retrieved_chunks['documents'][0]
126
+ logger.info(f"πŸ“‹ Retrieved {len(chunks)} relevant chunks:")
127
+ for i, chunk in enumerate(chunks):
128
+ chunk_preview = chunk[:150] + "..." if len(chunk) > 150 else chunk
129
+ logger.info(f" Chunk {i+1}: {chunk_preview}")
130
+
131
+ context_for_prompt = "\n\n---\n\n".join(chunks)
132
+ logger.info(f"πŸ“ CONTEXT FOR LLM (total length: {len(context_for_prompt)} chars):")
133
+ context_preview = context_for_prompt[:300] + "..." if len(context_for_prompt) > 300 else context_for_prompt
134
+ logger.info(f" Context preview: {context_preview}")
135
+ logger.info("-" * 60)
136
 
137
  # Step 4: Construct the final prompt for the LLM
138
  full_prompt = (
 
144
  "--- CONTEXT END ---\n\n"
145
  f'User\'s Question: "{request_data.prompt}"'
146
  )
147
+
148
+ logger.info(f"πŸ”§ FULL PROMPT TO LLM (length: {len(full_prompt)} chars):")
149
+ prompt_preview = full_prompt[:400] + "..." if len(full_prompt) > 400 else full_prompt
150
+ logger.info(f" Prompt preview: {prompt_preview}")
151
+ logger.info("-" * 60)
152
 
153
  # Step 5: Generate the response using the LLM
154
+ logger.info("🧠 Generating response from OpenRouter...")
155
  response_text = await _generate_response_async(full_prompt)
156
+
157
+ logger.info(f"πŸ“€ LLM RESPONSE (length: {len(response_text)} chars):")
158
+ response_preview = response_text[:300] + "..." if len(response_text) > 300 else response_text
159
+ logger.info(f" Response preview: {response_preview}")
160
 
161
  # Step 6: Cache the newly generated response
162
  _cache_response(request_data.prompt, response_text)
163
+ logger.info("πŸ’Ύ Response cached for future use")
164
 
165
  total_time = time.time() - start_total
166
+ logger.info(f"⏱️ Total processing time: {total_time:.2f}s")
167
+ logger.info("βœ… RAG PIPELINE COMPLETED SUCCESSFULLY")
168
+ logger.info("=" * 80)
169
+
170
  return response_text
171
 
172
  except asyncio.TimeoutError:
173
+ logger.error("⏱️ Request timed out during retrieval or generation.")
174
  return "The request timed out. Please try again or simplify your question."
175
  except Exception as e:
176
+ logger.error(f"❌ An unexpected error occurred: {e}", exc_info=True)
177
  return f"An unexpected error occurred: {e}"
178
 
179
 
 
182
  Executes a specific task on the given context.
183
  """
184
  start_total = time.time()
185
+
186
+ logger.info("=" * 80)
187
+ logger.info("🎯 STARTING TASK EXECUTION")
188
+ logger.info("=" * 80)
189
+ logger.info(f"πŸ“‹ TASK TYPE: {request_data.task_type}")
190
+ logger.info(f"❓ TASK PROMPT: '{request_data.prompt}'")
191
+ logger.info(f"πŸ“ Context length: {len(request_data.context)} characters")
192
+
193
+ context_preview = request_data.context[:200] + "..." if len(request_data.context) > 200 else request_data.context
194
+ logger.info(f"πŸ“ CONTEXT PREVIEW: {context_preview}")
195
+ logger.info("-" * 60)
196
 
197
  try:
198
  # For tasks, we use the full context, not just retrieved chunks
199
  context = request_data.context
200
  if not context:
201
+ logger.warning("⚠️ Context is empty for task execution")
202
  return "Context is empty. Please provide some text in the 'Knowledge Base' to perform a task."
203
 
204
  # Construct the prompt based on the task type
 
209
  elif request_data.task_type == "creative":
210
  full_prompt = f"Use the following text as inspiration to write a creative piece (e.g., a poem, a short story, a metaphor). The user's prompt can guide the style or topic.\n\n--- INSPIRATION ---\n{context}\n\n--- PROMPT ---\n{request_data.prompt or 'Write a short poem'}"
211
  else:
212
+ logger.error(f"❌ Invalid task type: {request_data.task_type}")
213
  return "Invalid task type specified."
214
 
215
+ logger.info(f"πŸ”§ FULL TASK PROMPT (length: {len(full_prompt)} chars):")
216
+ prompt_preview = full_prompt[:400] + "..." if len(full_prompt) > 400 else full_prompt
217
+ logger.info(f" Prompt preview: {prompt_preview}")
218
+ logger.info("-" * 60)
219
+
220
  # Generate the response
221
+ logger.info("🧠 Generating task-based response from OpenRouter...")
222
  response_text = await _generate_response_async(full_prompt)
223
+
224
+ logger.info(f"πŸ“€ TASK RESPONSE (length: {len(response_text)} chars):")
225
+ response_preview = response_text[:300] + "..." if len(response_text) > 300 else response_text
226
+ logger.info(f" Response preview: {response_preview}")
227
 
228
  total_time = time.time() - start_total
229
+ logger.info(f"⏱️ Task execution time: {total_time:.2f}s")
230
+ logger.info("βœ… TASK EXECUTION COMPLETED SUCCESSFULLY")
231
+ logger.info("=" * 80)
232
+
233
  return response_text
234
 
235
  except asyncio.TimeoutError:
236
+ logger.error("⏱️ Request timed out during task execution.")
237
  return "The request timed out. Please try again."
238
  except Exception as e:
239
+ logger.error(f"❌ An unexpected error occurred during task execution: {e}", exc_info=True)
240
  return f"An unexpected error occurred: {e}"
241
 
242
  # --- ASYNC WRAPPERS & CACHE HELPERS ---
243
 
244
  async def _retrieve_chunks_async(prompt: str):
245
  """Asynchronously queries the ChromaDB collection."""
246
+ logger.info(f"πŸ” Querying ChromaDB for prompt: '{prompt}'")
247
  loop = asyncio.get_event_loop()
248
+ result = await loop.run_in_executor(
249
  None,
250
  functools.partial(rag_setup.collection.query, query_texts=[prompt], n_results=3)
251
  )
252
+ logger.info(f"πŸ“Š ChromaDB query returned {len(result.get('documents', [[]])[0])} chunks")
253
+ return result
254
 
255
 
256
  async def _generate_response_async(full_prompt: str):
257
  """Asynchronously calls the LLM to generate content."""
258
+ logger.info("πŸ€– Calling LLM for content generation...")
259
+ logger.info(f"πŸ“ Prompt length sent to LLM: {len(full_prompt)} characters")
260
+
261
  loop = asyncio.get_event_loop()
262
+ response = await loop.run_in_executor(
263
  None,
264
  rag_setup.generation_model.generate_content,
265
  full_prompt
266
  )
267
+
268
+ logger.info(f"βœ… LLM response received (length: {len(response)} chars)")
269
+ return response
270
 
271
  def _get_cached_response(key: str):
272
  """Checks the cache for a valid (non-expired) entry."""
273
  if key in _response_cache:
274
  timestamp, response = _response_cache[key]
275
  if time.time() - timestamp < CACHE_EXPIRATION_SECONDS:
276
+ logger.info(f"πŸ’Ύ Cache hit for key: '{key[:50]}...'")
277
  return response
278
  else:
279
  # Expired, remove from cache
280
  del _response_cache[key]
281
+ logger.info(f"πŸ—‘οΈ Expired cache entry removed for key: '{key[:50]}...'")
282
  return None
283
 
284
 
285
  def _cache_response(key: str, response: str):
286
  """Adds a response to the cache with the current timestamp."""
287
+ _response_cache[key] = (time.time(), response)
288
+ logger.info(f"πŸ’Ύ Response cached for key: '{key[:50]}...' (response length: {len(response)} chars)")