import os import sys import sqlite3 import torch from datasets import Dataset, DatasetDict from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments SUPPORTED_FILE_TYPES = ['.sh', '.bat', '.ps1', '.cs', '.c', '.cpp', '.h', '.cmake', '.py', '.git', '.sql', '.csv', '.sqlite', '.lsl'] def extrahiere_parameter(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() anzahl_zeilen = len(lines) anzahl_zeichen = sum(len(line) for line in lines) long_text_mode = anzahl_zeilen > 1000 dimensionalität = 1 # Beispielwert, kann angepasst werden return { "text": file_path, "anzahl_zeilen": anzahl_zeilen, "anzahl_zeichen": anzahl_zeichen, "long_text_mode": long_text_mode, "dimensionalität": dimensionalität } except UnicodeDecodeError as e: print(f"Fehler beim Lesen der Datei {file_path}: {e}") return None except Exception as e: print(f"Allgemeiner Fehler beim Lesen der Datei {file_path}: {e}") return None def durchsuchen_und_extrahieren(root_dir, db_pfad): try: with sqlite3.connect(db_pfad) as conn: cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS dateiparameter (id INTEGER PRIMARY KEY, dateipfad TEXT, anzahl_zeilen INTEGER, anzahl_zeichen INTEGER, long_text_mode BOOLEAN, dimensionalität INTEGER)''') for subdir, _, files in os.walk(root_dir): for file in files: if any(file.endswith(ext) for ext in SUPPORTED_FILE_TYPES): file_path = os.path.join(subdir, file) parameter = extrahiere_parameter(file_path) if parameter: cursor.execute('''INSERT INTO dateiparameter (dateipfad, anzahl_zeilen, anzahl_zeichen, long_text_mode, dimensionalität) VALUES (?, ?, ?, ?, ?)''', (file_path, parameter["anzahl_zeilen"], parameter["anzahl_zeichen"], parameter["long_text_mode"], parameter["dimensionalität"])) conn.commit() print("Parameter erfolgreich extrahiert und in der Datenbank gespeichert.") except sqlite3.Error as e: print(f"SQLite Fehler: {e}") except Exception as e: print(f"Allgemeiner Fehler: {e}") def extrahiere_parameter_aus_db(db_pfad): try: with sqlite3.connect(db_pfad) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM dateiparameter") daten = cursor.fetchall() return daten except sqlite3.Error as e: print(f"SQLite Fehler: {e}") return None except Exception as e: print(f"Allgemeiner Fehler: {e}") return None def konvertiere_zu_hf_dataset(daten): dataset_dict = { "text": [], "anzahl_zeilen": [], "anzahl_zeichen": [], "long_text_mode": [], "dimensionalität": [] } for eintrag in daten: dataset_dict["text"].append(eintrag[1]) # 'text' entspricht 'dateipfad' dataset_dict["anzahl_zeilen"].append(eintrag[2]) dataset_dict["anzahl_zeichen"].append(eintrag[3]) dataset_dict["long_text_mode"].append(eintrag[4]) dataset_dict["dimensionalität"].append(eintrag[5]) return Dataset.from_dict(dataset_dict) def trainiere_und_speichere_modell(hf_dataset, output_model_dir): try: tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) def tokenize_function(examples): return tokenizer(examples["text"], padding="max_length", truncation=True) tokenized_datasets = hf_dataset.map(tokenize_function, batched=True) # Beispielhaftes Hinzufügen von Dummy-Labels für das Training tokenized_datasets = tokenized_datasets.map(lambda examples: {"label": [0] * len(examples["text"])}, batched=True) # Dummy labels as int # Aufteilen des Datensatzes in Training und Test train_test_split = tokenized_datasets.train_test_split(test_size=0.2) train_dataset = train_test_split["train"] eval_dataset = train_test_split["test"] num_labels = len(set(train_dataset["label"])) model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) training_args = TrainingArguments( output_dir=output_model_dir, evaluation_strategy="epoch", # Aktualisiert nach der Deprecation-Warnung per_device_train_batch_size=8, per_device_eval_batch_size=8, num_train_epochs=3, weight_decay=0.01, ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, ) trainer.train() model.save_pretrained(output_model_dir) tokenizer.save_pretrained(output_model_dir) print(f"Das Modell wurde erfolgreich in {output_model_dir} gespeichert.") print("You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.") except Exception as e: print(f"Fehler beim Trainieren und Speichern des Modells: {e}") if __name__ == "__main__": # Verzeichnispfad als Argument übergeben, falls vorhanden if len(sys.argv) > 1: directory_path = sys.argv[1] else: directory_path = '.' # Standardverzeichnis, falls kein Argument übergeben wurde db_name = os.path.basename(os.path.normpath(directory_path)) + '.db' durchsuchen_und_extrahieren(directory_path, db_name) daten = extrahiere_parameter_aus_db(db_name) if daten: hf_dataset = konvertiere_zu_hf_dataset(daten) output_model = os.path.basename(os.path.normpath(directory_path)) + '_model' # Verzeichnisname Modell output_model_dir = os.path.join(os.path.dirname(db_name), output_model) trainiere_und_speichere_modell(hf_dataset, output_model_dir) else: print("Keine Daten gefunden, um ein HF-Dataset zu erstellen.")