Spaces:
Running on Zero
Running on Zero
| """ | |
| Gemini AI analysis module for deceptive pattern detection. | |
| Updated to match gemini_prompting_to_make_dp_csvs_genai.py structure. | |
| """ | |
| import pandas as pd | |
| import os | |
| import time | |
| import csv | |
| from io import StringIO | |
| import json | |
| from glob import glob | |
| from tqdm.auto import tqdm | |
| import gradio as gr | |
| try: | |
| from google import genai | |
| from google.genai.errors import ServerError | |
| GENAI_AVAILABLE = True | |
| except ImportError: | |
| GENAI_AVAILABLE = False | |
| def check_csv_format(df: pd.DataFrame) -> str: | |
| """ | |
| Check if the csv file generated is in the correct format as is expected. | |
| Expectation is that the csv file has 10 columns and the index is integer. | |
| It is also expected that all the cells in the csv file are strings and not null. | |
| If the csv file has only one column, it is considered as a bad file. | |
| Args: | |
| df: pandas DataFrame object that is read from the csv file. | |
| Returns: | |
| str: A string that indicates the status of the csv | |
| """ | |
| if 1 < len(df.columns) < 10: | |
| return "The CSV file has less than 10 columns." | |
| elif len(df.columns) > 10: | |
| return "The CSV file has more than 10 columns." | |
| elif not isinstance(df.index, pd.core.indexes.range.RangeIndex): | |
| return "The CSV file has an incorrect index. Probably issue with the PIPE (|) separation variable." | |
| elif len(df.columns) == 1: | |
| return "The CSV file has only one column." | |
| elif 'Text' in df.columns and not isinstance(df.Text.dtype, object): | |
| return "The CSV file has non-string values in the Text column." | |
| else: | |
| return "The CSV file is in the correct format." | |
| # analyze_with_gemini function removed - using few_shots_generator instead | |
| REQUIRED_GEMINI_COLS = ["Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"] | |
| def _parse_response_to_df(response_text, original_csv_path, drop_existing=False): | |
| """ | |
| Parse a Gemini CSV response and validate it can be merged with the source CSV. | |
| Tries pipe then comma separators. Checks required columns, row-count match, and nulls. | |
| Returns: | |
| (final_df, None) on success, or (None, error_message) on failure. | |
| The error_message is user-readable and safe to send back to the LLM for correction. | |
| """ | |
| cleaned = response_text.replace("```csv", '').replace("```", '').strip() | |
| parsed = None | |
| sep_errors = [] | |
| for sep in ['|', ',']: | |
| try: | |
| candidate = pd.read_csv(StringIO(cleaned), sep=sep) | |
| missing = [c for c in REQUIRED_GEMINI_COLS if c not in candidate.columns] | |
| if missing: | |
| sep_errors.append(f"separator '{sep}': missing required columns {missing}; got {list(candidate.columns)}") | |
| continue | |
| parsed = candidate | |
| break | |
| except Exception as e: | |
| sep_errors.append(f"separator '{sep}': {e}") | |
| if parsed is None: | |
| return None, "Failed to parse CSV. " + " | ".join(sep_errors) | |
| try: | |
| csv_with_yolo = pd.read_csv(original_csv_path, index_col=0) | |
| if drop_existing: | |
| csv_with_yolo = csv_with_yolo.drop(columns=REQUIRED_GEMINI_COLS, errors='ignore') | |
| except Exception as e: | |
| return None, f"Could not read source CSV {original_csv_path}: {e}" | |
| if len(parsed) != len(csv_with_yolo): | |
| return None, ( | |
| f"Row count mismatch: response has {len(parsed)} rows but input has " | |
| f"{len(csv_with_yolo)} rows. Output must contain exactly one row per input row." | |
| ) | |
| gemini_cols = parsed[REQUIRED_GEMINI_COLS] | |
| if gemini_cols.isnull().any().any(): | |
| null_rows = gemini_cols[gemini_cols.isnull().any(axis=1)].index.tolist() | |
| return None, f"Null values found in required columns at row indices: {null_rows[:10]}" | |
| csv_with_yolo.reset_index(inplace=True) | |
| final_df = pd.concat([csv_with_yolo, gemini_cols], axis=1) | |
| return final_df, None | |
| def _build_correction_request(error_message): | |
| """ | |
| Build the follow-up instruction asking the model to correct its prior bad response. | |
| Pair with `previous_interaction_id=<prior interaction id>` so the server provides history. | |
| """ | |
| return ( | |
| f"Your previous response could not be parsed into a valid DataFrame.\n" | |
| f"Validation error:\n{error_message}\n\n" | |
| f"Please regenerate ONLY the corrected pipe-separated CSV output. " | |
| f"Maintain exactly the same number of rows as the input. Include the required columns " | |
| f"({', '.join(REQUIRED_GEMINI_COLS)}). " | |
| f"Output only the CSV — no markdown code fences, no explanations." | |
| ) | |
| def _extract_model_text(interaction): | |
| """Concatenate text from all model_output steps in an Interactions response.""" | |
| chunks = [] | |
| for step in interaction.steps: | |
| if getattr(step, "type", None) == "model_output": | |
| for block in getattr(step, "content", []) or []: | |
| if getattr(block, "type", None) == "text": | |
| chunks.append(getattr(block, "text", "") or "") | |
| return "".join(chunks) | |
| def _extract_thought_text(interaction): | |
| """Concatenate thought summary text from all thought steps in an Interactions response.""" | |
| chunks = [] | |
| for step in interaction.steps: | |
| if getattr(step, "type", None) == "thought": | |
| summary = getattr(step, "summary", None) | |
| if not summary: | |
| continue | |
| for block in summary: | |
| if getattr(block, "type", None) == "text": | |
| chunks.append(getattr(block, "text", "") or "") | |
| return "".join(chunks) | |
| def few_shots_generator(eval_dir='./eval', files=None, api_key=None): | |
| """ | |
| Generator version of few_shots that yields notifications in real-time. | |
| Yields: | |
| tuple: (status, message) where status is 'notification' or 'result' | |
| """ | |
| print(f"[CONSOLE] few_shots_generator: Starting analysis...") | |
| print(f"[CONSOLE] eval_dir: {eval_dir}") | |
| print(f"[CONSOLE] files: {files}") | |
| print(f"[CONSOLE] API key provided: {'Yes' if api_key else 'No'}") | |
| if not api_key: | |
| print(f"[CONSOLE] No API key provided, returning None") | |
| yield ('notification', "❌ No API key provided for analysis") | |
| raise gr.Error("No API key provided for analysis") | |
| # Read system prompt from gradio-demo directory | |
| try: | |
| system_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt.txt') | |
| with open(system_prompt_path, 'r', encoding='utf-8') as f: | |
| textsi_1 = f.read() | |
| print(f"[CONSOLE] System prompt loaded from: {system_prompt_path}") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to load system prompt: {e}") | |
| yield ('notification', "❌ Failed to load system prompt") | |
| raise gr.Error(f"Failed to load system prompt: {str(e)}") | |
| os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True) | |
| print(f"[CONSOLE] Created gemini_fs directory: {eval_dir}/gemini_fs") | |
| try: | |
| client = genai.Client(api_key=api_key) | |
| print(f"[CONSOLE] Gemini client initialized") | |
| except Exception as e: | |
| error_msg = f"❌ Failed to initialize Gemini client: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Client initialization failed: {e}") | |
| raise gr.Error(f"Failed to initialize Gemini client: {str(e)}") | |
| if files is None: | |
| files = glob(os.path.join(f"{eval_dir}/csv_with_yolo", "*.csv")) | |
| if not isinstance(files, list): | |
| files = [files] | |
| print(f"[CONSOLE] Processing {len(files)} files") | |
| for f in files: | |
| print(f"[CONSOLE] Processing file: {f}") | |
| try: | |
| data = pd.read_csv(f, index_col=0) | |
| data.index = data.index.str.replace('|', '', regex=False) | |
| data = data.to_csv(sep="|", quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Data loaded and converted to CSV format") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to read the file: {f}, error: {e}") | |
| raise gr.Error(f"Failed to read input file: {str(e)}") | |
| try_cnt = 0 | |
| while try_cnt < 2: | |
| try: | |
| try_cnt += 1 | |
| yield ('notification', f"🤖 Calling Gemini AI for pattern analysis (attempt {try_cnt})...") | |
| if try_cnt == 1: | |
| gr.Info("🤖 Starting Gemini analysis...") | |
| print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini Interactions API...") | |
| interaction = client.interactions.create( | |
| model='gemini-3-flash-preview', | |
| input=data, | |
| system_instruction=textsi_1, | |
| generation_config={ | |
| 'temperature': 1, | |
| 'top_p': 0.1, | |
| 'max_output_tokens': 45 * 1024, | |
| 'thinking_level': 'high', | |
| }, | |
| ) | |
| response_text = _extract_model_text(interaction) | |
| yield ('notification', f"✅ Gemini API call successful! Processing results...") | |
| gr.Info("✅ Gemini analysis successful!") | |
| print(f"[CONSOLE] Gemini API call successful") | |
| break | |
| except ServerError as e: | |
| if try_cnt > 3: | |
| error_msg = f"❌ Failed to get response after {try_cnt} attempts" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Failed to get response for {f} after {try_cnt} attempts") | |
| raise gr.Error(f"Analysis failed after {try_cnt} attempts") | |
| wait_msg = f"⚠️ Server error occurred. Retrying attempt {try_cnt + 1}/2 in 60 seconds..." | |
| yield ('notification', wait_msg) | |
| gr.Warning(f"⚠️ Server error. Retrying in 60 seconds... (attempt {try_cnt + 1}/2)") | |
| print(f"[CONSOLE] Server error: {e.message}, sleeping for 60 seconds") | |
| print(e) | |
| time.sleep(60) | |
| continue | |
| except Exception as e: | |
| # Handle non-server errors (API key issues, quota errors, etc.) | |
| error_msg = f"❌ Gemini API error: {str(e.message)}" | |
| print(f"[CONSOLE] Non-server error in Gemini API call: {e}") | |
| yield 'notification', error_msg | |
| raise gr.Error(f"Gemini API error: {str(e.message)}") | |
| _f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f)) | |
| # Parse and validate; on failure, ask the model to self-correct once. | |
| final_df, parse_error = _parse_response_to_df(response_text, f, drop_existing=False) | |
| if final_df is None: | |
| yield ('notification', f"⚠️ Output validation failed: {parse_error[:200]}. Asking Gemini to correct (1 retry)...") | |
| gr.Info("⚠️ Output invalid — asking Gemini to correct") | |
| print(f"[CONSOLE] FEW_SHOT parse/validation failed: {parse_error}") | |
| try: | |
| correction_interaction = client.interactions.create( | |
| model='gemini-3-flash-preview', | |
| input=_build_correction_request(parse_error), | |
| previous_interaction_id=interaction.id, | |
| system_instruction=textsi_1, | |
| generation_config={ | |
| 'temperature': 1, | |
| 'top_p': 0.1, | |
| 'max_output_tokens': 45 * 1024, | |
| 'thinking_level': 'high', | |
| }, | |
| ) | |
| interaction = correction_interaction | |
| response_text = _extract_model_text(interaction) | |
| final_df, parse_error = _parse_response_to_df(response_text, f, drop_existing=False) | |
| except Exception as e_corr: | |
| print(f"[CONSOLE] FEW_SHOT correction call failed: {e_corr}") | |
| parse_error = f"Correction API call failed: {e_corr}" | |
| if final_df is None: | |
| try: | |
| error_file = _f.replace(".csv", "_parse_error.txt") | |
| with open(error_file, 'w', encoding='utf-8') as _fs: | |
| _fs.write(response_text) | |
| print(f"[CONSOLE] Failed response saved to: {error_file}") | |
| except Exception as e3: | |
| print(f"[CONSOLE] Failed to save error response: {e3}") | |
| error_msg = f"❌ Gemini output could not be parsed even after correction: {parse_error}" | |
| yield ('notification', error_msg) | |
| raise gr.Error(f"Failed to parse response after correction: {parse_error}") | |
| else: | |
| yield ('notification', "✅ Gemini correction succeeded — output parsed successfully") | |
| gr.Info("✅ Corrected output parsed successfully!") | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Results saved to: {_f}") | |
| # Check if thinking is needed (if any deceptive patterns found) | |
| if set(final_df['Deceptive Design Category'].tolist()) != {'non-deceptive'}: | |
| yield ('notification', "🧠 Deceptive patterns detected! Running advanced thinking analysis...") | |
| gr.Info("🧠 Deceptive patterns found! Running advanced analysis...") | |
| print(f"[CONSOLE] Deceptive patterns found, running thinking analysis...") | |
| thinking_result = None | |
| for thinking_status, thinking_data in thinking_generator(eval_dir, files=[_f], api_key=api_key): | |
| if thinking_status == 'notification': | |
| yield ('notification', thinking_data) | |
| elif thinking_status == 'result': | |
| thinking_result = thinking_data | |
| break | |
| if thinking_result is not None: | |
| yield ('notification', "✅ Advanced thinking analysis completed successfully!") | |
| gr.Info("✅ Advanced analysis completed!") | |
| print(f"[CONSOLE] Thinking analysis completed, using refined results") | |
| final_df = thinking_result | |
| else: | |
| yield ('notification', "⚠️ Advanced thinking analysis failed, using original results") | |
| gr.Warning("⚠️ Advanced analysis failed, using basic results") | |
| print(f"[CONSOLE] Thinking analysis failed, using original results") | |
| else: | |
| yield ('notification', "✅ No deceptive patterns found, analysis complete!") | |
| gr.Info("✅ No deceptive patterns detected!") | |
| print(f"[CONSOLE] No deceptive patterns found, skipping thinking analysis") | |
| yield ('result', final_df) | |
| return | |
| yield ('result', None) | |
| def thinking_generator(eval_dir="./eval", files=None, api_key=None): | |
| """ | |
| Generator version of thinking that yields notifications in real-time. | |
| """ | |
| print(f"[CONSOLE] thinking_generator: Starting thinking analysis...") | |
| print(f"[CONSOLE] eval_dir: {eval_dir}") | |
| print(f"[CONSOLE] files: {files}") | |
| if not api_key: | |
| print(f"[CONSOLE] No API key provided for thinking analysis") | |
| raise gr.Error("No API key provided for thinking analysis") | |
| # Read thinking system prompt from gradio-demo directory | |
| try: | |
| thinking_prompt_path = os.path.join(os.path.dirname(__file__), '..', 'system_prompt_thinking.txt') | |
| with open(thinking_prompt_path, 'r', encoding='utf-8') as f: | |
| textsi_1 = f.read() | |
| print(f"[CONSOLE] Thinking system prompt loaded from: {thinking_prompt_path}") | |
| except Exception as e: | |
| print(f"[CONSOLE] Failed to load thinking system prompt: {e}") | |
| raise gr.Error(f"Failed to load thinking system prompt: {str(e)}") | |
| os.makedirs(f"{eval_dir}/gemini_fs", exist_ok=True) | |
| try: | |
| client = genai.Client(api_key=api_key) | |
| print(f"[CONSOLE] Thinking client initialized") | |
| except Exception as e: | |
| error_msg = f"❌ Failed to initialize thinking client: {str(e)}" | |
| print(f"[CONSOLE] Thinking client initialization failed: {e}") | |
| raise gr.Error(f"Failed to initialize thinking client: {str(e)}") | |
| if files is None: | |
| files = glob(os.path.join(f"{eval_dir}/gemini_fs", "*.csv")) | |
| if not isinstance(files, list): | |
| files = [files] | |
| print(f"[CONSOLE] Processing {len(files)} files for thinking analysis") | |
| for f in files: | |
| print(f"[CONSOLE] Thinking analysis for file: {f}") | |
| try: | |
| data = pd.read_csv(f, index_col=0) | |
| data.index = data.index.str.replace('|', '', regex=False) | |
| data = data.to_csv(sep="|", quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Data prepared for thinking analysis") | |
| # Make API call to Gemini with retry logic for thinking analysis | |
| try_cnt = 0 | |
| interaction = None | |
| while try_cnt < 2: | |
| try: | |
| try_cnt += 1 | |
| yield ('notification', f"🧠 Running advanced thinking analysis (attempt {try_cnt})...") | |
| print(f"[CONSOLE] Attempt {try_cnt} - Calling Gemini Interactions API for thinking...") | |
| interaction = client.interactions.create( | |
| model='gemini-3-flash-preview', | |
| input=data, | |
| system_instruction=textsi_1, | |
| generation_config={ | |
| 'temperature': 1, | |
| 'top_p': 0.1, | |
| 'max_output_tokens': 45 * 1024, | |
| 'thinking_level': 'high', | |
| 'thinking_summaries': 'auto', | |
| }, | |
| ) | |
| yield ('notification', f"✅ Advanced thinking analysis API call successful!") | |
| print(f"[CONSOLE] Thinking API call successful") | |
| break | |
| except ServerError as e: | |
| if try_cnt > 3: | |
| error_msg = f"❌ Failed to complete thinking analysis after {try_cnt} attempts" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Failed to get thinking response after {try_cnt} attempts") | |
| raise gr.Error(f"Advanced analysis failed after {try_cnt} attempts") | |
| wait_msg = f"⚠️ Server error in thinking analysis. Retrying attempt {try_cnt + 1}/2 in 60 seconds..." | |
| yield ('notification', wait_msg) | |
| gr.Warning(f"⚠️ Thinking server error. Retrying in 60s... (attempt {try_cnt + 1}/2)") | |
| print(f"[CONSOLE] Server error in thinking analysis: {e.message}, sleeping for 60 seconds") | |
| print(e) | |
| time.sleep(60) | |
| continue | |
| except Exception as e: | |
| # Handle non-server errors in thinking analysis | |
| error_msg = f"❌ Thinking analysis API error: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Non-server error in thinking API call: {e}") | |
| raise gr.Error(f"Thinking analysis API error: {str(e)}") | |
| output_csv = _extract_model_text(interaction) | |
| thought_txt = _extract_thought_text(interaction) | |
| print(f"[CONSOLE] Extracted output CSV ({len(output_csv)} chars)") | |
| print(f"[CONSOLE] Extracted thought text ({len(thought_txt)} chars)") | |
| _f = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f)) | |
| _f_thought = os.path.join(f"{eval_dir}", "gemini_fs", os.path.basename(f).replace(".csv", "_thinking.txt")) | |
| # Save thinking text | |
| with open(_f_thought, 'w', encoding='utf-8') as _f_thought_file: | |
| _f_thought_file.write(thought_txt) | |
| print(f"[CONSOLE] Thinking text saved to: {_f_thought}") | |
| # Parse and validate; on failure, ask the model to self-correct once. | |
| final_df, parse_error = _parse_response_to_df(output_csv, f, drop_existing=True) | |
| if final_df is None: | |
| yield ('notification', f"⚠️ Thinking output validation failed: {parse_error[:200]}. Asking Gemini to correct (1 retry)...") | |
| gr.Info("⚠️ Thinking output invalid — asking Gemini to correct") | |
| print(f"[CONSOLE] THINKING parse/validation failed: {parse_error}") | |
| try: | |
| correction_interaction = client.interactions.create( | |
| model='gemini-3-flash-preview', | |
| input=_build_correction_request(parse_error), | |
| previous_interaction_id=interaction.id, | |
| system_instruction=textsi_1, | |
| generation_config={ | |
| 'temperature': 1, | |
| 'top_p': 0.1, | |
| 'max_output_tokens': 45 * 1024, | |
| 'thinking_level': 'high', | |
| 'thinking_summaries': 'auto', | |
| }, | |
| ) | |
| corrected_csv = _extract_model_text(correction_interaction) | |
| final_df, parse_error = _parse_response_to_df(corrected_csv, f, drop_existing=True) | |
| if final_df is not None: | |
| output_csv = corrected_csv | |
| interaction = correction_interaction | |
| except Exception as e_corr: | |
| print(f"[CONSOLE] THINKING correction call failed: {e_corr}") | |
| parse_error = f"Correction API call failed: {e_corr}" | |
| if final_df is None: | |
| try: | |
| error_file = _f.replace(".csv", "_thinking_parse_error.txt") | |
| with open(error_file, 'w', encoding='utf-8') as _fs: | |
| _fs.write(output_csv) | |
| print(f"[CONSOLE] Thinking failed response saved to: {error_file}") | |
| except Exception as e3: | |
| print(f"[CONSOLE] Failed to save thinking error response: {e3}") | |
| error_msg = f"❌ Thinking output could not be parsed even after correction: {parse_error}" | |
| yield ('notification', error_msg) | |
| raise gr.Error(f"Failed to parse thinking response after correction: {parse_error}") | |
| else: | |
| yield ('notification', "✅ Gemini thinking correction succeeded — output parsed successfully") | |
| gr.Info("✅ Corrected thinking output parsed successfully!") | |
| final_df.to_csv(_f, index=False, quoting=csv.QUOTE_ALL) | |
| print(f"[CONSOLE] Thinking results saved to: {_f}") | |
| yield ('result', final_df) | |
| return | |
| except Exception as e: | |
| error_msg = f"❌ Error in thinking analysis: {str(e)}" | |
| yield ('notification', error_msg) | |
| print(f"[CONSOLE] Error in thinking analysis for {f}: {e}") | |
| raise gr.Error(f"Thinking analysis error: {str(e)}") | |
| yield ('result', None) # Return None if no files processed |