import os import sys import sqlite3 from datasets import Dataset from transformers import AutoTokenizer, AutoModelForSequenceClassification, TFAutoModelForSequenceClassification, Trainer, TrainingArguments from bs4 import BeautifulSoup import xml.etree.ElementTree as ET import pyth.plugins.rtf15.reader as rtf15_reader import pyth.plugins.plaintext.writer as plaintext_writer SUPPORTED_FILE_TYPES = ['.sh', '.bat', '.ps1', '.cs', '.c', '.cpp', '.h', '.cmake', '.py', '.git', '.sql', '.csv', '.sqlite', '.lsl', '.html', '.xml', '.rtf'] def extrahiere_parameter(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() anzahl_zeilen = len(lines) anzahl_zeichen = sum(len(line) for line in lines) long_text_mode = anzahl_zeilen > 1000 dimensionalität = 1 # Beispielwert, kann angepasst werden return { "text": file_path, "anzahl_zeilen": anzahl_zeilen, "anzahl_zeichen": anzahl_zeichen, "long_text_mode": long_text_mode, "dimensionalität": dimensionalität } except UnicodeDecodeError as e: print(f"Fehler beim Lesen der Datei {file_path}: {e}") return None except Exception as e: print(f"Allgemeiner Fehler beim Lesen der Datei {file_path}: {e}") return None def extrahiere_parameter_html(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() soup = BeautifulSoup(content, 'html.parser') text = soup.get_text() anzahl_zeilen = text.count('\n') anzahl_zeichen = len(text) long_text_mode = anzahl_zeilen > 1000 dimensionalität = 1 return { "text": text, "anzahl_zeilen": anzahl_zeilen, "anzahl_zeichen": anzahl_zeichen, "long_text_mode": long_text_mode, "dimensionalität": dimensionalität } except Exception as e: print(f"Fehler beim Lesen der HTML-Datei {file_path}: {e}") return None def extrahiere_parameter_xml(file_path): try: tree = ET.parse(file_path) root = tree.getroot() text = ET.tostring(root, encoding='unicode', method='text') anzahl_zeilen = text.count('\n') anzahl_zeichen = len(text) long_text_mode = anzahl_zeilen > 1000 dimensionalität = 1 return { "text": text, "anzahl_zeilen": anzahl_zeilen, "anzahl_zeichen": anzahl_zeichen, "long_text_mode": long_text_mode, "dimensionalität": dimensionalität } except Exception as e: print(f"Fehler beim Lesen der XML-Datei {file_path}: {e}") return None def extrahiere_parameter_rtf(file_path): try: with open(file_path, 'rb') as file: doc = rtf15_reader.read(file) text = plaintext_writer.write(doc).getvalue() anzahl_zeilen = text.count('\n') anzahl_zeichen = len(text) long_text_mode = anzahl_zeilen > 1000 dimensionalität = 1 return { "text": text, "anzahl_zeilen": anzahl_zeilen, "anzahl_zeichen": anzahl_zeichen, "long_text_mode": long_text_mode, "dimensionalität": dimensionalität } except Exception as e: print(f"Fehler beim Lesen der RTF-Datei {file_path}: {e}") return None def durchsuchen_und_extrahieren(root_dir, db_pfad): try: with sqlite3.connect(db_pfad) as conn: cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS dateiparameter (id INTEGER PRIMARY KEY, dateipfad TEXT, anzahl_zeilen INTEGER, anzahl_zeichen INTEGER, long_text_mode BOOLEAN, dimensionalität INTEGER)''') for subdir, _, files in os.walk(root_dir): for file in files: file_path = os.path.join(subdir, file) if file.endswith('.html'): parameter = extrahiere_parameter_html(file_path) elif file.endswith('.xml'): parameter = extrahiere_parameter_xml(file_path) elif file.endswith('.rtf'): parameter = extrahiere_parameter_rtf(file_path) elif any(file.endswith(ext) for ext in SUPPORTED_FILE_TYPES): parameter = extrahiere_parameter(file_path) else: continue if parameter: cursor.execute('''INSERT INTO dateiparameter (dateipfad, anzahl_zeilen, anzahl_zeichen, long_text_mode, dimensionalität) VALUES (?, ?, ?, ?, ?)''', (file_path, parameter["anzahl_zeilen"], parameter["anzahl_zeichen"], parameter["long_text_mode"], parameter["dimensionalität"])) conn.commit() print("Parameter erfolgreich extrahiert und in der Datenbank gespeichert.") except sqlite3.Error as e: print(f"SQLite Fehler: {e}") except Exception as e: print(f"Allgemeiner Fehler: {e}") def extrahiere_parameter_aus_db(db_pfad): try: with sqlite3.connect(db_pfad) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM dateiparameter") daten = cursor.fetchall() return daten except sqlite3.Error as e: print(f"SQLite Fehler: {e}") return None except Exception as e: print(f"Allgemeiner Fehler: {e}") return None def konvertiere_zu_hf_dataset(daten): dataset_dict = { "text": [], "anzahl_zeilen": [], "anzahl_zeichen": [], "long_text_mode": [], "dimensionalität": [] } for eintrag in daten: dataset_dict["text"].append(eintrag[1]) # 'text' entspricht 'dateipfad' dataset_dict["anzahl_zeilen"].append(eintrag[2]) dataset_dict["anzahl_zeichen"].append(eintrag[3]) dataset_dict["long_text_mode"].append(eintrag[4]) dataset_dict["dimensionalität"].append(eintrag[5]) return Dataset.from_dict(dataset_dict) def trainiere_und_speichere_modell(hf_dataset, output_model_dir): try: tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) def tokenize_function(examples): return tokenizer(examples["text"], padding="max_length", truncation=True) tokenized_datasets = hf_dataset.map(tokenize_function, batched=True) # Beispielhaftes Hinzufügen von Dummy-Labels für das Training tokenized_datasets = tokenized_datasets.map(lambda examples: {"label": [0.0] * len(examples["text"])}, batched=True) # Dummy labels as float # Aufteilen des Datensatzes in Training und Test train_test_split = tokenized_datasets.train_test_split(test_size=0.2) train_dataset = train_test_split["train"] eval_dataset = train_test_split["test"] num_labels = len(set(train_dataset["label"])) # PyTorch Modell model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) training_args = TrainingArguments( output_dir=output_model_dir, evaluation_strategy="epoch", # Aktualisiert nach der Deprecation-Warnung per_device_train_batch_size=8, per_device_eval_batch_size=8, num_train_epochs=3, weight_decay=0.01, ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, ) trainer.train() model.save_pretrained(output_model_dir) tokenizer.save_pretrained(output_model_dir) # TensorFlow Modell tf_model = TFAutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) tf_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) # Dummy-Daten für das Speichern im TensorFlow-Format import tensorflow as tf dummy_input = tf.constant(tokenizer("This is a dummy input", return_tensors="tf")["input_ids"]) # Speichern des TensorFlow-Modells tf_model(dummy_input) # Modell einmal aufrufen, um es zu "bauen" tf_model.save_pretrained(output_model_dir) print(f"Das Modell wurde erfolgreich in {output_model_dir} gespeichert.") except Exception as e: print(f"Fehler beim Trainieren und Speichern des Modells: {e}") if __name__ == "__main__": # Verzeichnispfad als Argument übergeben, falls vorhanden if len(sys.argv) > 1: directory_path = sys.argv[1] else: directory_path = '.' # Standardverzeichnis, falls kein Argument übergeben wurde db_name = os.path.basename(os.path.normpath(directory_path)) + '.db' durchsuchen_und_extrahieren(directory_path, db_name) daten = extrahiere_parameter_aus_db(db_name) if daten: hf_dataset = konvertiere_zu_hf_dataset(daten) output_model = os.path.basename(os.path.normpath(directory_path)) + '_model' # Verzeichnisname Modell output_model_dir = os.path.join(os.path.dirname(db_name), output_model) trainiere_und_speichere_modell(hf_dataset, output_model_dir) else: print("Keine Daten gefunden, um ein HF-Dataset zu erstellen.")