DeceptivePatternDetector / py_files /gemini_analysis.py
Asmit Nayak
Refactor Gemini analysis to improve CSV response parsing and validation
72d1624
"""
Gemini AI analysis module for deceptive pattern detection.
Updated to match gemini_prompting_to_make_dp_csvs_genai.py structure.
"""
import pandas as pd
import os
import time
import csv
from io import StringIO
import json
from glob import glob
from tqdm.auto import tqdm
import gradio as gr
try:
from google import genai
from google.genai.errors import ServerError
GENAI_AVAILABLE = True
except ImportError:
GENAI_AVAILABLE = False
def check_csv_format(df: pd.DataFrame) -> str:
"""
Check if the csv file generated is in the correct format as is expected.
Expectation is that the csv file has 10 columns and the index is integer.
It is also expected that all the cells in the csv file are strings and not null.
If the csv file has only one column, it is considered as a bad file.
Args:
df: pandas DataFrame object that is read from the csv file.
Returns:
str: A string that indicates the status of the csv
"""
if 1 < len(df.columns) < 10:
return "The CSV file has less than 10 columns."
elif len(df.columns) > 10:
return "The CSV file has more than 10 columns."
elif not isinstance(df.index, pd.core.indexes.range.RangeIndex):
return "The CSV file has an incorrect index. Probably issue with the PIPE (|) separation variable."
elif len(df.columns) == 1:
return "The CSV file has only one column."
elif 'Text' in df.columns and not isinstance(df.Text.dtype, object):
return "The CSV file has non-string values in the Text column."
else:
return "The CSV file is in the correct format."
# analyze_with_gemini function removed - using few_shots_generator instead
REQUIRED_GEMINI_COLS = ["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]
def _parse_response_to_df(response_text, original_csv_path, drop_existing=False):
"""
Parse a Gemini CSV response and validate it can be merged with the source CSV.
Tries pipe then comma separators. Checks required columns, row-count match, and nulls.
Returns:
(final_df, None) on success, or (None, error_message) on failure.
The error_message is user-readable and safe to send back to the LLM for correction.
"""
cleaned = response_text.replace("```csv", '').replace("```", '').strip()
parsed = None
sep_errors = []
for sep in ['|', ',']:
try:
candidate = pd.read_csv(StringIO(cleaned), sep=sep)
missing = [c for c in REQUIRED_GEMINI_COLS if c not in candidate.columns]
if missing:
sep_errors.append(f"separator '{sep}': missing required columns {missing}; got {list(candidate.columns)}")
continue
parsed = candidate
break
except Exception as e:
sep_errors.append(f"separator '{sep}': {e}")
if parsed is None:
return None, "Failed to parse CSV. " + " | ".join(sep_errors)
try:
csv_with_yolo = pd.read_csv(original_csv_path, index_col=0)
if drop_existing:
csv_with_yolo = csv_with_yolo.drop(columns=REQUIRED_GEMINI_COLS, errors='ignore')
except Exception as e:
return None, f"Could not read source CSV {original_csv_path}: {e}"
if len(parsed) != len(csv_with_yolo):
return None, (
f"Row count mismatch: response has {len(parsed)} rows but input has "
f"{len(csv_with_yolo)} rows. Output must contain exactly one row per input row."
)
gemini_cols = parsed[REQUIRED_GEMINI_COLS]
if gemini_cols.isnull().any().any():
null_rows = gemini_cols[gemini_cols.isnull().any(axis=1)].index.tolist()
return None, f"Null values found in required columns at row indices: {null_rows[:10]}"
csv_with_yolo.reset_index(inplace=True)
final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1)
return final_df, None
def _build_correction_request(error_message):
"""
Build the follow-up instruction asking the model to correct its prior bad response.
Pair with `previous_interaction_id=<prior interaction id>` so the server provides history.
"""
return (
f"Your previous response could not be parsed into a valid DataFrame.\n"
f"Validation error:\n{error_message}\n\n"
f"Please regenerate ONLY the corrected pipe-separated CSV output. "
f"Maintain exactly the same number of rows as the input. Include the required columns "
f"({', '.join(REQUIRED_GEMINI_COLS)}). "
f"Output only the CSV — no markdown code fences, no explanations."
)
def _extract_model_text(interaction):
"""Concatenate text from all model_output steps in an Interactions response."""
chunks = []
for step in interaction.steps:
if getattr(step, "type", None) == "model_output":
for block in getattr(step, "content", []) or []:
if getattr(block, "type", None) == "text":
chunks.append(getattr(block, "text", "") or "")
return "".join(chunks)
def _extract_thought_text(interaction):
"""Concatenate thought summary text from all thought steps in an Interactions response."""
chunks = []
for step in interaction.steps:
if getattr(step, "type", None) == "thought":
summary = getattr(step, "summary", None)
if not summary:
continue
for block in summary:
if getattr(block, "type", None) == "text":
chunks.append(getattr(block, "text", "") or "")
return "".join(chunks)
def few_shots_generator(eval_dir='./eval', files=None, api_key=None):
"""
Generator version of few_shots that yields notifications in real-time.
Yields:
tuple: (status, message) where status is 'notification' or 'result'
"""
print(f"[CONSOLE] few_shots_generator: Starting analysis...")
print(f"[CONSOLE] eval_dir: {eval_dir}")
print(f"[CONSOLE] files: {files}")
print(f"[CONSOLE] API key provided: {'Yes' if api_key else 'No'}")
if not api_key:
print(f"[CONSOLE] No API key provided, returning None")
yield ('notification', "❌ No API key provided for analysis")
raise gr.Error("No API key provided for analysis")
# Read system prompt from gradio-demo directory
try:
system_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt.txt')
with open(system_prompt_path, 'r', encoding='utf-8') as f:
textsi_1 = f.read()
print(f"[CONSOLE] System prompt loaded from: {system_prompt_path}")
except Exception as e:
print(f"[CONSOLE] Failed to load system prompt: {e}")
yield ('notification', "❌ Failed to load system prompt")
raise gr.Error(f"Failed to load system prompt: {str(e)}")
os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True)
print(f"[CONSOLE] Created gemini_fs directory: {eval_dir}/gemini_fs")
try:
client = genai.Client(api_key=api_key)
print(f"[CONSOLE] Gemini client initialized")
except Exception as e:
error_msg = f"❌ Failed to initialize Gemini client: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Client initialization failed: {e}")
raise gr.Error(f"Failed to initialize Gemini client: {str(e)}")
if files is None:
files = glob(os.path.join(f"{eval_dir}/csv_with_yolo", "*.csv"))
if not isinstance(files, list):
files = [files]
print(f"[CONSOLE] Processing {len(files)} files")
for f in files:
print(f"[CONSOLE] Processing file: {f}")
try:
data = pd.read_csv(f, index_col=0)
data.index = data.index.str.replace('|', '', regex=False)
data = data.to_csv(sep="|", quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Data loaded and converted to CSV format")
except Exception as e:
print(f"[CONSOLE] Failed to read the file: {f}, error: {e}")
raise gr.Error(f"Failed to read input file: {str(e)}")
try_cnt = 0
while try_cnt < 2:
try:
try_cnt += 1
yield ('notification', f"🤖 Calling Gemini AI for pattern analysis (attempt {try_cnt})...")
if try_cnt == 1:
gr.Info("🤖 Starting Gemini analysis...")
print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini Interactions API...")
interaction = client.interactions.create(
model='gemini-3-flash-preview',
input=data,
system_instruction=textsi_1,
generation_config={
'temperature': 1,
'top_p': 0.1,
'max_output_tokens': 45 * 1024,
'thinking_level': 'high',
},
)
response_text = _extract_model_text(interaction)
yield ('notification', f"✅ Gemini API call successful! Processing results...")
gr.Info("✅ Gemini analysis successful!")
print(f"[CONSOLE] Gemini API call successful")
break
except ServerError as e:
if try_cnt > 3:
error_msg = f"❌ Failed to get response after {try_cnt} attempts"
yield ('notification', error_msg)
print(f"[CONSOLE] Failed to get response for {f} after {try_cnt} attempts")
raise gr.Error(f"Analysis failed after {try_cnt} attempts")
wait_msg = f"⚠️ Server error occurred. Retrying attempt {try_cnt + 1}/2 in 60 seconds..."
yield ('notification', wait_msg)
gr.Warning(f"⚠️ Server error. Retrying in 60 seconds... (attempt {try_cnt + 1}/2)")
print(f"[CONSOLE] Server error: {e.message}, sleeping for 60 seconds")
print(e)
time.sleep(60)
continue
except Exception as e:
# Handle non-server errors (API key issues, quota errors, etc.)
error_msg = f"❌ Gemini API error: {str(e.message)}"
print(f"[CONSOLE] Non-server error in Gemini API call: {e}")
yield 'notification', error_msg
raise gr.Error(f"Gemini API error: {str(e.message)}")
_f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f))
# Parse and validate; on failure, ask the model to self-correct once.
final_df, parse_error = _parse_response_to_df(response_text, f, drop_existing=False)
if final_df is None:
yield ('notification', f"⚠️ Output validation failed: {parse_error[:200]}. Asking Gemini to correct (1 retry)...")
gr.Info("⚠️ Output invalid — asking Gemini to correct")
print(f"[CONSOLE] FEW_SHOT parse/validation failed: {parse_error}")
try:
correction_interaction = client.interactions.create(
model='gemini-3-flash-preview',
input=_build_correction_request(parse_error),
previous_interaction_id=interaction.id,
system_instruction=textsi_1,
generation_config={
'temperature': 1,
'top_p': 0.1,
'max_output_tokens': 45 * 1024,
'thinking_level': 'high',
},
)
interaction = correction_interaction
response_text = _extract_model_text(interaction)
final_df, parse_error = _parse_response_to_df(response_text, f, drop_existing=False)
except Exception as e_corr:
print(f"[CONSOLE] FEW_SHOT correction call failed: {e_corr}")
parse_error = f"Correction API call failed: {e_corr}"
if final_df is None:
try:
error_file = _f.replace(".csv", "_parse_error.txt")
with open(error_file, 'w', encoding='utf-8') as _fs:
_fs.write(response_text)
print(f"[CONSOLE] Failed response saved to: {error_file}")
except Exception as e3:
print(f"[CONSOLE] Failed to save error response: {e3}")
error_msg = f"❌ Gemini output could not be parsed even after correction: {parse_error}"
yield ('notification', error_msg)
raise gr.Error(f"Failed to parse response after correction: {parse_error}")
else:
yield ('notification', "✅ Gemini correction succeeded — output parsed successfully")
gr.Info("✅ Corrected output parsed successfully!")
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Results saved to: {_f}")
# Check if thinking is needed (if any deceptive patterns found)
if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}:
yield ('notification', "🧠 Deceptive patterns detected! Running advanced thinking analysis...")
gr.Info("🧠 Deceptive patterns found! Running advanced analysis...")
print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...")
thinking_result = None
for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key):
if thinking_status == 'notification':
yield ('notification', thinking_data)
elif thinking_status == 'result':
thinking_result = thinking_data
break
if thinking_result is not None:
yield ('notification', "✅ Advanced thinking analysis completed successfully!")
gr.Info("✅ Advanced analysis completed!")
print(f"[CONSOLE] Thinking analysis completed, using refined results")
final_df = thinking_result
else:
yield ('notification', "⚠️ Advanced thinking analysis failed, using original results")
gr.Warning("⚠️ Advanced analysis failed, using basic results")
print(f"[CONSOLE] Thinking analysis failed, using original results")
else:
yield ('notification', "✅ No deceptive patterns found, analysis complete!")
gr.Info("✅ No deceptive patterns detected!")
print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis")
yield ('result', final_df)
return
yield ('result', None)
def thinking_generator(eval_dir="./eval", files=None, api_key=None):
"""
Generator version of thinking that yields notifications in real-time.
"""
print(f"[CONSOLE] thinking_generator: Starting thinking analysis...")
print(f"[CONSOLE] eval_dir: {eval_dir}")
print(f"[CONSOLE] files: {files}")
if not api_key:
print(f"[CONSOLE] No API key provided for thinking analysis")
raise gr.Error("No API key provided for thinking analysis")
# Read thinking system prompt from gradio-demo directory
try:
thinking_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt_thinking.txt')
with open(thinking_prompt_path, 'r', encoding='utf-8') as f:
textsi_1 = f.read()
print(f"[CONSOLE] Thinking system prompt loaded from: {thinking_prompt_path}")
except Exception as e:
print(f"[CONSOLE] Failed to load thinking system prompt: {e}")
raise gr.Error(f"Failed to load thinking system prompt: {str(e)}")
os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True)
try:
client = genai.Client(api_key=api_key)
print(f"[CONSOLE] Thinking client initialized")
except Exception as e:
error_msg = f"❌ Failed to initialize thinking client: {str(e)}"
print(f"[CONSOLE] Thinking client initialization failed: {e}")
raise gr.Error(f"Failed to initialize thinking client: {str(e)}")
if files is None:
files = glob(os.path.join(f"{eval_dir}/gemini_fs", "*.csv"))
if not isinstance(files, list):
files = [files]
print(f"[CONSOLE] Processing {len(files)} files for thinking analysis")
for f in files:
print(f"[CONSOLE] Thinking analysis for file: {f}")
try:
data = pd.read_csv(f, index_col=0)
data.index = data.index.str.replace('|', '', regex=False)
data = data.to_csv(sep="|", quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Data prepared for thinking analysis")
# Make API call to Gemini with retry logic for thinking analysis
try_cnt = 0
interaction = None
while try_cnt < 2:
try:
try_cnt += 1
yield ('notification', f"🧠 Running advanced thinking analysis (attempt {try_cnt})...")
print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini Interactions API for thinking...")
interaction = client.interactions.create(
model='gemini-3-flash-preview',
input=data,
system_instruction=textsi_1,
generation_config={
'temperature': 1,
'top_p': 0.1,
'max_output_tokens': 45 * 1024,
'thinking_level': 'high',
'thinking_summaries': 'auto',
},
)
yield ('notification', f"✅ Advanced thinking analysis API call successful!")
print(f"[CONSOLE] Thinking API call successful")
break
except ServerError as e:
if try_cnt > 3:
error_msg = f"❌ Failed to complete thinking analysis after {try_cnt} attempts"
yield ('notification', error_msg)
print(f"[CONSOLE] Failed to get thinking response after {try_cnt} attempts")
raise gr.Error(f"Advanced analysis failed after {try_cnt} attempts")
wait_msg = f"⚠️ Server error in thinking analysis. Retrying attempt {try_cnt + 1}/2 in 60 seconds..."
yield ('notification', wait_msg)
gr.Warning(f"⚠️ Thinking server error. Retrying in 60s... (attempt {try_cnt + 1}/2)")
print(f"[CONSOLE] Server error in thinking analysis: {e.message}, sleeping for 60 seconds")
print(e)
time.sleep(60)
continue
except Exception as e:
# Handle non-server errors in thinking analysis
error_msg = f"❌ Thinking analysis API error: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Non-server error in thinking API call: {e}")
raise gr.Error(f"Thinking analysis API error: {str(e)}")
output_csv = _extract_model_text(interaction)
thought_txt = _extract_thought_text(interaction)
print(f"[CONSOLE] Extracted output CSV ({len(output_csv)} chars)")
print(f"[CONSOLE] Extracted thought text ({len(thought_txt)} chars)")
_f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f))
_f_thought = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f).replace(".csv", "_thinking.txt"))
# Save thinking text
with open(_f_thought, 'w', encoding='utf-8') as _f_thought_file:
_f_thought_file.write(thought_txt)
print(f"[CONSOLE] Thinking text saved to: {_f_thought}")
# Parse and validate; on failure, ask the model to self-correct once.
final_df, parse_error = _parse_response_to_df(output_csv, f, drop_existing=True)
if final_df is None:
yield ('notification', f"⚠️ Thinking output validation failed: {parse_error[:200]}. Asking Gemini to correct (1 retry)...")
gr.Info("⚠️ Thinking output invalid — asking Gemini to correct")
print(f"[CONSOLE] THINKING parse/validation failed: {parse_error}")
try:
correction_interaction = client.interactions.create(
model='gemini-3-flash-preview',
input=_build_correction_request(parse_error),
previous_interaction_id=interaction.id,
system_instruction=textsi_1,
generation_config={
'temperature': 1,
'top_p': 0.1,
'max_output_tokens': 45 * 1024,
'thinking_level': 'high',
'thinking_summaries': 'auto',
},
)
corrected_csv = _extract_model_text(correction_interaction)
final_df, parse_error = _parse_response_to_df(corrected_csv, f, drop_existing=True)
if final_df is not None:
output_csv = corrected_csv
interaction = correction_interaction
except Exception as e_corr:
print(f"[CONSOLE] THINKING correction call failed: {e_corr}")
parse_error = f"Correction API call failed: {e_corr}"
if final_df is None:
try:
error_file = _f.replace(".csv", "_thinking_parse_error.txt")
with open(error_file, 'w', encoding='utf-8') as _fs:
_fs.write(output_csv)
print(f"[CONSOLE] Thinking failed response saved to: {error_file}")
except Exception as e3:
print(f"[CONSOLE] Failed to save thinking error response: {e3}")
error_msg = f"❌ Thinking output could not be parsed even after correction: {parse_error}"
yield ('notification', error_msg)
raise gr.Error(f"Failed to parse thinking response after correction: {parse_error}")
else:
yield ('notification', "✅ Gemini thinking correction succeeded — output parsed successfully")
gr.Info("✅ Corrected thinking output parsed successfully!")
final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL)
print(f"[CONSOLE] Thinking results saved to: {_f}")
yield ('result', final_df)
return
except Exception as e:
error_msg = f"❌ Error in thinking analysis: {str(e)}"
yield ('notification', error_msg)
print(f"[CONSOLE] Error in thinking analysis for {f}: {e}")
raise gr.Error(f"Thinking analysis error: {str(e)}")
yield ('result', None) # Return None if no files processed