| |
| """ |
| Comprehensive Data Processor |
| ============================ |
| Processes all available data sources: PDFs, documents, existing training data, |
| and generates comprehensive training datasets for the enhanced tokenizer system. |
| """ |
|
|
| import json |
| import os |
| import re |
| from pathlib import Path |
| from typing import Dict, List, Any |
| from datetime import datetime |
|
|
| |
| try: |
| import PyPDF2 |
| PDF_AVAILABLE = True |
| except ImportError: |
| PDF_AVAILABLE = False |
|
|
| try: |
| import pdfplumber |
| PDFPLUMBER_AVAILABLE = True |
| except ImportError: |
| PDFPLUMBER_AVAILABLE = False |
|
|
| class ComprehensiveDataProcessor: |
| """Processes all available data sources for training.""" |
| |
| def __init__(self): |
| self.all_training_data = [] |
| self.processing_stats = { |
| "files_processed": 0, |
| "total_entries": 0, |
| "sources": {} |
| } |
| |
| def extract_pdf_text(self, pdf_path: str) -> str: |
| """Extract text from PDF.""" |
| try: |
| if PDFPLUMBER_AVAILABLE: |
| text = "" |
| with pdfplumber.open(pdf_path) as pdf: |
| for page in pdf.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| return text.strip() |
| elif PDF_AVAILABLE: |
| text = "" |
| with open(pdf_path, 'rb') as file: |
| pdf_reader = PyPDF2.PdfReader(file) |
| for page in pdf_reader.pages: |
| text += page.extract_text() + "\n" |
| return text.strip() |
| except Exception as e: |
| print(f"β PDF extraction failed for {pdf_path}: {e}") |
| return "" |
| |
| def process_existing_jsonl(self, file_path: str) -> List[Dict[str, Any]]: |
| """Process existing JSONL training files.""" |
| entries = [] |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| for line_num, line in enumerate(f, 1): |
| line = line.strip() |
| if line: |
| try: |
| data = json.loads(line) |
| |
| entry = { |
| "id": f"{Path(file_path).stem}_{line_num}", |
| "source": "existing_jsonl", |
| "source_file": file_path, |
| "prompt": data.get("prompt", ""), |
| "completion": data.get("completion", ""), |
| "content": f"{data.get('prompt', '')} {data.get('completion', '')}", |
| "metadata": data.get("metadata", {}), |
| "processed_at": datetime.now().isoformat() |
| } |
| entries.append(entry) |
| except json.JSONDecodeError as e: |
| print(f"β οΈ JSON decode error in {file_path} line {line_num}: {e}") |
| except Exception as e: |
| print(f"β Error processing {file_path}: {e}") |
| |
| print(f"β
Processed {len(entries)} entries from {file_path}") |
| return entries |
| |
| def process_text_file(self, file_path: str) -> List[Dict[str, Any]]: |
| """Process text/markdown files.""" |
| entries = [] |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| |
| |
| content = re.sub(r'\s+', ' ', content).strip() |
| |
| |
| chunks = self.chunk_text(content, chunk_size=512) |
| |
| for i, chunk in enumerate(chunks): |
| entry = { |
| "id": f"{Path(file_path).stem}_{i+1}", |
| "source": "text_file", |
| "source_file": file_path, |
| "content": chunk, |
| "metadata": { |
| "file_type": Path(file_path).suffix, |
| "chunk_id": i + 1, |
| "total_chunks": len(chunks) |
| }, |
| "processed_at": datetime.now().isoformat() |
| } |
| entries.append(entry) |
| |
| except Exception as e: |
| print(f"β Error processing {file_path}: {e}") |
| |
| print(f"β
Processed {len(entries)} entries from {file_path}") |
| return entries |
| |
| def process_pdf_file(self, file_path: str) -> List[Dict[str, Any]]: |
| """Process PDF files.""" |
| entries = [] |
| try: |
| text = self.extract_pdf_text(file_path) |
| if text: |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| chunks = self.chunk_text(text, chunk_size=512) |
| |
| for i, chunk in enumerate(chunks): |
| entry = { |
| "id": f"{Path(file_path).stem}_{i+1}", |
| "source": "pdf_file", |
| "source_file": file_path, |
| "content": chunk, |
| "metadata": { |
| "file_type": "pdf", |
| "chunk_id": i + 1, |
| "total_chunks": len(chunks), |
| "extracted_length": len(text) |
| }, |
| "processed_at": datetime.now().isoformat() |
| } |
| entries.append(entry) |
| except Exception as e: |
| print(f"β Error processing {file_path}: {e}") |
| |
| print(f"β
Processed {len(entries)} entries from {file_path}") |
| return entries |
| |
| def chunk_text(self, text: str, chunk_size: int = 512) -> List[str]: |
| """Chunk text into manageable pieces.""" |
| words = text.split() |
| chunks = [] |
| |
| for i in range(0, len(words), chunk_size): |
| chunk = ' '.join(words[i:i + chunk_size]) |
| if len(chunk.strip()) > 50: |
| chunks.append(chunk.strip()) |
| |
| return chunks |
| |
| def analyze_content_type(self, content: str) -> str: |
| """Analyze content type.""" |
| content_lower = content.lower() |
| |
| |
| if any(keyword in content_lower for keyword in ['def ', 'class ', 'import ', 'function', 'var ', 'const ']): |
| return "code" |
| |
| |
| if re.search(r'[\$\^\+\-\*\/\=\<\>\(\)]', content): |
| return "mathematical" |
| |
| |
| if any(keyword in content_lower for keyword in ['select', 'from', 'where', 'join', 'sql']): |
| return "sql" |
| |
| |
| if any(keyword in content_lower for keyword in ['research', 'study', 'analysis', 'methodology', 'results']): |
| return "academic" |
| |
| return "general" |
| |
| def enhance_training_entries(self, entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Enhance training entries with additional metadata.""" |
| enhanced_entries = [] |
| |
| for entry in entries: |
| content = entry.get("content", "") |
| content_type = self.analyze_content_type(content) |
| |
| |
| enhanced_entry = entry.copy() |
| enhanced_entry["enhanced_metadata"] = { |
| "content_type": content_type, |
| "word_count": len(content.split()), |
| "char_count": len(content), |
| "has_code": "code" in content_type, |
| "has_math": "mathematical" in content_type or "$" in content, |
| "has_sql": "sql" in content_type, |
| "complexity_score": len(content.split()) / 100.0, |
| "unique_words": len(set(content.lower().split())), |
| "avg_word_length": sum(len(word) for word in content.split()) / len(content.split()) if content.split() else 0 |
| } |
| |
| enhanced_entries.append(enhanced_entry) |
| |
| return enhanced_entries |
| |
| def process_all_data_sources(self) -> Dict[str, Any]: |
| """Process all available data sources.""" |
| print("π Comprehensive Data Processing") |
| print("=" * 40) |
| |
| |
| jsonl_files = [ |
| "matrix_training_data.jsonl", |
| "training_data_emergent.jsonl", |
| "comprehensive_training_data.jsonl" |
| ] |
| |
| text_files = [ |
| "README.md", |
| "COMPLETE_INTEGRATION_SUMMARY.md", |
| "THE_BLOOM_IS_COMPLETE.md", |
| "COMPLETE_ACHIEVEMENT_REPORT.md", |
| "BENCHMARK_ANALYSIS.md" |
| ] |
| |
| pdf_files = [ |
| "LOOM_OF_EMERGENCE.pdf" |
| ] |
| |
| all_entries = [] |
| |
| |
| print("\nπ Processing JSONL training files...") |
| for file_path in jsonl_files: |
| if Path(file_path).exists(): |
| entries = self.process_existing_jsonl(file_path) |
| all_entries.extend(entries) |
| self.processing_stats["sources"][file_path] = len(entries) |
| self.processing_stats["files_processed"] += 1 |
| else: |
| print(f"β οΈ File not found: {file_path}") |
| |
| |
| print("\nπ Processing text/markdown files...") |
| for file_path in text_files: |
| if Path(file_path).exists(): |
| entries = self.process_text_file(file_path) |
| all_entries.extend(entries) |
| self.processing_stats["sources"][file_path] = len(entries) |
| self.processing_stats["files_processed"] += 1 |
| else: |
| print(f"β οΈ File not found: {file_path}") |
| |
| |
| print("\nπ Processing PDF files...") |
| for file_path in pdf_files: |
| if Path(file_path).exists(): |
| entries = self.process_pdf_file(file_path) |
| all_entries.extend(entries) |
| self.processing_stats["sources"][file_path] = len(entries) |
| self.processing_stats["files_processed"] += 1 |
| else: |
| print(f"β οΈ File not found: {file_path}") |
| |
| |
| print("\nπ§ Enhancing training entries...") |
| enhanced_entries = self.enhance_training_entries(all_entries) |
| |
| self.processing_stats["total_entries"] = len(enhanced_entries) |
| |
| |
| content_types = {} |
| for entry in enhanced_entries: |
| content_type = entry["enhanced_metadata"]["content_type"] |
| content_types[content_type] = content_types.get(content_type, 0) + 1 |
| |
| results = { |
| "processing_stats": self.processing_stats, |
| "content_type_distribution": content_types, |
| "total_entries": len(enhanced_entries), |
| "timestamp": datetime.now().isoformat(), |
| "sources_summary": { |
| "jsonl_files": len([f for f in jsonl_files if Path(f).exists()]), |
| "text_files": len([f for f in text_files if Path(f).exists()]), |
| "pdf_files": len([f for f in pdf_files if Path(f).exists()]) |
| } |
| } |
| |
| return results, enhanced_entries |
| |
| def save_comprehensive_training_data(self, entries: List[Dict[str, Any]], results: Dict[str, Any]): |
| """Save comprehensive training data.""" |
| print(f"\nπΎ Saving {len(entries)} training entries...") |
| |
| |
| with open("comprehensive_training_data.jsonl", 'w', encoding='utf-8') as f: |
| for entry in entries: |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') |
| |
| |
| with open("comprehensive_processing_results.json", 'w', encoding='utf-8') as f: |
| json.dump(results, f, indent=2, ensure_ascii=False) |
| |
| |
| summary = { |
| "total_entries": len(entries), |
| "content_types": results["content_type_distribution"], |
| "sources": results["processing_stats"]["sources"], |
| "files_processed": results["processing_stats"]["files_processed"], |
| "timestamp": results["timestamp"] |
| } |
| |
| with open("training_data_summary.json", 'w', encoding='utf-8') as f: |
| json.dump(summary, f, indent=2, ensure_ascii=False) |
| |
| print("β
Training data saved:") |
| print(" π comprehensive_training_data.jsonl") |
| print(" π comprehensive_processing_results.json") |
| print(" π training_data_summary.json") |
| |
| def print_processing_summary(self, results: Dict[str, Any], entries: List[Dict[str, Any]]): |
| """Print processing summary.""" |
| print("\nπ Processing Summary") |
| print("=" * 30) |
| print(f"β
Files processed: {results['processing_stats']['files_processed']}") |
| print(f"π Total entries: {len(entries)}") |
| |
| print(f"\nπ Content Type Distribution:") |
| for content_type, count in results["content_type_distribution"].items(): |
| percentage = (count / len(entries)) * 100 |
| print(f" {content_type}: {count} entries ({percentage:.1f}%)") |
| |
| print(f"\nπ Sources:") |
| for source, count in results["processing_stats"]["sources"].items(): |
| print(f" {Path(source).name}: {count} entries") |
| |
| print(f"\nπ― Ready for training with {len(entries)} comprehensive entries!") |
|
|
| def main(): |
| """Main processing function.""" |
| processor = ComprehensiveDataProcessor() |
| |
| |
| results, entries = processor.process_all_data_sources() |
| |
| |
| processor.save_comprehensive_training_data(entries, results) |
| |
| |
| processor.print_processing_summary(results, entries) |
| |
| return results, entries |
|
|
| if __name__ == "__main__": |
| main() |
|
|