| import os |
| import sys |
| import sqlite3 |
| from datasets import Dataset |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification, TFAutoModelForSequenceClassification, Trainer, TrainingArguments |
| from bs4 import BeautifulSoup |
| import xml.etree.ElementTree as ET |
| import pyth.plugins.rtf15.reader as rtf15_reader |
| import pyth.plugins.plaintext.writer as plaintext_writer |
|
|
| SUPPORTED_FILE_TYPES = ['.sh', '.bat', '.ps1', '.cs', '.c', '.cpp', '.h', '.cmake', '.py', '.git', '.sql', '.csv', '.sqlite', '.lsl', '.html', '.xml', '.rtf'] |
|
|
| def extrahiere_parameter(file_path): |
| try: |
| with open(file_path, 'r', encoding='utf-8') as file: |
| lines = file.readlines() |
| anzahl_zeilen = len(lines) |
| anzahl_zeichen = sum(len(line) for line in lines) |
| long_text_mode = anzahl_zeilen > 1000 |
| dimensionalität = 1 |
| return { |
| "text": file_path, |
| "anzahl_zeilen": anzahl_zeilen, |
| "anzahl_zeichen": anzahl_zeichen, |
| "long_text_mode": long_text_mode, |
| "dimensionalität": dimensionalität |
| } |
| except UnicodeDecodeError as e: |
| print(f"Fehler beim Lesen der Datei {file_path}: {e}") |
| return None |
| except Exception as e: |
| print(f"Allgemeiner Fehler beim Lesen der Datei {file_path}: {e}") |
| return None |
|
|
| def extrahiere_parameter_html(file_path): |
| try: |
| with open(file_path, 'r', encoding='utf-8') as file: |
| content = file.read() |
| soup = BeautifulSoup(content, 'html.parser') |
| text = soup.get_text() |
| anzahl_zeilen = text.count('\n') |
| anzahl_zeichen = len(text) |
| long_text_mode = anzahl_zeilen > 1000 |
| dimensionalität = 1 |
| return { |
| "text": text, |
| "anzahl_zeilen": anzahl_zeilen, |
| "anzahl_zeichen": anzahl_zeichen, |
| "long_text_mode": long_text_mode, |
| "dimensionalität": dimensionalität |
| } |
| except Exception as e: |
| print(f"Fehler beim Lesen der HTML-Datei {file_path}: {e}") |
| return None |
|
|
| def extrahiere_parameter_xml(file_path): |
| try: |
| tree = ET.parse(file_path) |
| root = tree.getroot() |
| text = ET.tostring(root, encoding='unicode', method='text') |
| anzahl_zeilen = text.count('\n') |
| anzahl_zeichen = len(text) |
| long_text_mode = anzahl_zeilen > 1000 |
| dimensionalität = 1 |
| return { |
| "text": text, |
| "anzahl_zeilen": anzahl_zeilen, |
| "anzahl_zeichen": anzahl_zeichen, |
| "long_text_mode": long_text_mode, |
| "dimensionalität": dimensionalität |
| } |
| except Exception as e: |
| print(f"Fehler beim Lesen der XML-Datei {file_path}: {e}") |
| return None |
|
|
| def extrahiere_parameter_rtf(file_path): |
| try: |
| with open(file_path, 'rb') as file: |
| doc = rtf15_reader.read(file) |
| text = plaintext_writer.write(doc).getvalue() |
| anzahl_zeilen = text.count('\n') |
| anzahl_zeichen = len(text) |
| long_text_mode = anzahl_zeilen > 1000 |
| dimensionalität = 1 |
| return { |
| "text": text, |
| "anzahl_zeilen": anzahl_zeilen, |
| "anzahl_zeichen": anzahl_zeichen, |
| "long_text_mode": long_text_mode, |
| "dimensionalität": dimensionalität |
| } |
| except Exception as e: |
| print(f"Fehler beim Lesen der RTF-Datei {file_path}: {e}") |
| return None |
|
|
| def durchsuchen_und_extrahieren(root_dir, db_pfad): |
| try: |
| with sqlite3.connect(db_pfad) as conn: |
| cursor = conn.cursor() |
| cursor.execute('''CREATE TABLE IF NOT EXISTS dateiparameter |
| (id INTEGER PRIMARY KEY, |
| dateipfad TEXT, |
| anzahl_zeilen INTEGER, |
| anzahl_zeichen INTEGER, |
| long_text_mode BOOLEAN, |
| dimensionalität INTEGER)''') |
|
|
| for subdir, _, files in os.walk(root_dir): |
| for file in files: |
| file_path = os.path.join(subdir, file) |
| if file.endswith('.html'): |
| parameter = extrahiere_parameter_html(file_path) |
| elif file.endswith('.xml'): |
| parameter = extrahiere_parameter_xml(file_path) |
| elif file.endswith('.rtf'): |
| parameter = extrahiere_parameter_rtf(file_path) |
| elif any(file.endswith(ext) for ext in SUPPORTED_FILE_TYPES): |
| parameter = extrahiere_parameter(file_path) |
| else: |
| continue |
|
|
| if parameter: |
| cursor.execute('''INSERT INTO dateiparameter (dateipfad, anzahl_zeilen, anzahl_zeichen, long_text_mode, dimensionalität) |
| VALUES (?, ?, ?, ?, ?)''', (file_path, parameter["anzahl_zeilen"], parameter["anzahl_zeichen"], parameter["long_text_mode"], parameter["dimensionalität"])) |
| conn.commit() |
| print("Parameter erfolgreich extrahiert und in der Datenbank gespeichert.") |
| except sqlite3.Error as e: |
| print(f"SQLite Fehler: {e}") |
| except Exception as e: |
| print(f"Allgemeiner Fehler: {e}") |
|
|
| def extrahiere_parameter_aus_db(db_pfad): |
| try: |
| with sqlite3.connect(db_pfad) as conn: |
| cursor = conn.cursor() |
| cursor.execute("SELECT * FROM dateiparameter") |
| daten = cursor.fetchall() |
| return daten |
| except sqlite3.Error as e: |
| print(f"SQLite Fehler: {e}") |
| return None |
| except Exception as e: |
| print(f"Allgemeiner Fehler: {e}") |
| return None |
|
|
| def konvertiere_zu_hf_dataset(daten): |
| dataset_dict = { |
| "text": [], |
| "anzahl_zeilen": [], |
| "anzahl_zeichen": [], |
| "long_text_mode": [], |
| "dimensionalität": [] |
| } |
| |
| for eintrag in daten: |
| dataset_dict["text"].append(eintrag[1]) |
| dataset_dict["anzahl_zeilen"].append(eintrag[2]) |
| dataset_dict["anzahl_zeichen"].append(eintrag[3]) |
| dataset_dict["long_text_mode"].append(eintrag[4]) |
| dataset_dict["dimensionalität"].append(eintrag[5]) |
| |
| return Dataset.from_dict(dataset_dict) |
|
|
| def trainiere_und_speichere_modell(hf_dataset, output_model_dir): |
| try: |
| tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) |
|
|
| def tokenize_function(examples): |
| return tokenizer(examples["text"], padding="max_length", truncation=True) |
|
|
| tokenized_datasets = hf_dataset.map(tokenize_function, batched=True) |
|
|
| |
| tokenized_datasets = tokenized_datasets.map(lambda examples: {"label": [0.0] * len(examples["text"])}, batched=True) |
|
|
| |
| train_test_split = tokenized_datasets.train_test_split(test_size=0.2) |
| train_dataset = train_test_split["train"] |
| eval_dataset = train_test_split["test"] |
|
|
| num_labels = len(set(train_dataset["label"])) |
|
|
| |
| model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
|
|
| training_args = TrainingArguments( |
| output_dir=output_model_dir, |
| evaluation_strategy="epoch", |
| per_device_train_batch_size=8, |
| per_device_eval_batch_size=8, |
| num_train_epochs=3, |
| weight_decay=0.01, |
| ) |
|
|
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=train_dataset, |
| eval_dataset=eval_dataset, |
| ) |
|
|
| trainer.train() |
| model.save_pretrained(output_model_dir) |
| tokenizer.save_pretrained(output_model_dir) |
|
|
| |
| tf_model = TFAutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
| tf_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
|
| |
| import tensorflow as tf |
| dummy_input = tf.constant(tokenizer("This is a dummy input", return_tensors="tf")["input_ids"]) |
|
|
| |
| tf_model(dummy_input) |
| tf_model.save_pretrained(output_model_dir) |
|
|
| print(f"Das Modell wurde erfolgreich in {output_model_dir} gespeichert.") |
| |
| except Exception as e: |
| print(f"Fehler beim Trainieren und Speichern des Modells: {e}") |
|
|
| if __name__ == "__main__": |
| |
| if len(sys.argv) > 1: |
| directory_path = sys.argv[1] |
| else: |
| directory_path = '.' |
| |
| db_name = os.path.basename(os.path.normpath(directory_path)) + '.db' |
|
|
| durchsuchen_und_extrahieren(directory_path, db_name) |
|
|
| daten = extrahiere_parameter_aus_db(db_name) |
| if daten: |
| hf_dataset = konvertiere_zu_hf_dataset(daten) |
|
|
| output_model = os.path.basename(os.path.normpath(directory_path)) + '_model' |
| output_model_dir = os.path.join(os.path.dirname(db_name), output_model) |
|
|
| trainiere_und_speichere_modell(hf_dataset, output_model_dir) |
| else: |
| print("Keine Daten gefunden, um ein HF-Dataset zu erstellen.") |
|
|