|
import os |
|
import sys |
|
import sqlite3 |
|
from datasets import Dataset |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TFAutoModelForSequenceClassification, Trainer, TrainingArguments |
|
from bs4 import BeautifulSoup |
|
import xml.etree.ElementTree as ET |
|
import pyth.plugins.rtf15.reader as rtf15_reader |
|
import pyth.plugins.plaintext.writer as plaintext_writer |
|
|
|
SUPPORTED_FILE_TYPES = ['.sh', '.bat', '.ps1', '.cs', '.c', '.cpp', '.h', '.cmake', '.py', '.git', '.sql', '.csv', '.sqlite', '.lsl', '.html', '.xml', '.rtf'] |
|
|
|
def extrahiere_parameter(file_path): |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
lines = file.readlines() |
|
anzahl_zeilen = len(lines) |
|
anzahl_zeichen = sum(len(line) for line in lines) |
|
long_text_mode = anzahl_zeilen > 1000 |
|
dimensionalität = 1 |
|
return { |
|
"text": file_path, |
|
"anzahl_zeilen": anzahl_zeilen, |
|
"anzahl_zeichen": anzahl_zeichen, |
|
"long_text_mode": long_text_mode, |
|
"dimensionalität": dimensionalität |
|
} |
|
except UnicodeDecodeError as e: |
|
print(f"Fehler beim Lesen der Datei {file_path}: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"Allgemeiner Fehler beim Lesen der Datei {file_path}: {e}") |
|
return None |
|
|
|
def extrahiere_parameter_html(file_path): |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
soup = BeautifulSoup(content, 'html.parser') |
|
text = soup.get_text() |
|
anzahl_zeilen = text.count('\n') |
|
anzahl_zeichen = len(text) |
|
long_text_mode = anzahl_zeilen > 1000 |
|
dimensionalität = 1 |
|
return { |
|
"text": text, |
|
"anzahl_zeilen": anzahl_zeilen, |
|
"anzahl_zeichen": anzahl_zeichen, |
|
"long_text_mode": long_text_mode, |
|
"dimensionalität": dimensionalität |
|
} |
|
except Exception as e: |
|
print(f"Fehler beim Lesen der HTML-Datei {file_path}: {e}") |
|
return None |
|
|
|
def extrahiere_parameter_xml(file_path): |
|
try: |
|
tree = ET.parse(file_path) |
|
root = tree.getroot() |
|
text = ET.tostring(root, encoding='unicode', method='text') |
|
anzahl_zeilen = text.count('\n') |
|
anzahl_zeichen = len(text) |
|
long_text_mode = anzahl_zeilen > 1000 |
|
dimensionalität = 1 |
|
return { |
|
"text": text, |
|
"anzahl_zeilen": anzahl_zeilen, |
|
"anzahl_zeichen": anzahl_zeichen, |
|
"long_text_mode": long_text_mode, |
|
"dimensionalität": dimensionalität |
|
} |
|
except Exception as e: |
|
print(f"Fehler beim Lesen der XML-Datei {file_path}: {e}") |
|
return None |
|
|
|
def extrahiere_parameter_rtf(file_path): |
|
try: |
|
with open(file_path, 'rb') as file: |
|
doc = rtf15_reader.read(file) |
|
text = plaintext_writer.write(doc).getvalue() |
|
anzahl_zeilen = text.count('\n') |
|
anzahl_zeichen = len(text) |
|
long_text_mode = anzahl_zeilen > 1000 |
|
dimensionalität = 1 |
|
return { |
|
"text": text, |
|
"anzahl_zeilen": anzahl_zeilen, |
|
"anzahl_zeichen": anzahl_zeichen, |
|
"long_text_mode": long_text_mode, |
|
"dimensionalität": dimensionalität |
|
} |
|
except Exception as e: |
|
print(f"Fehler beim Lesen der RTF-Datei {file_path}: {e}") |
|
return None |
|
|
|
def durchsuchen_und_extrahieren(root_dir, db_pfad): |
|
try: |
|
with sqlite3.connect(db_pfad) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute('''CREATE TABLE IF NOT EXISTS dateiparameter |
|
(id INTEGER PRIMARY KEY, |
|
dateipfad TEXT, |
|
anzahl_zeilen INTEGER, |
|
anzahl_zeichen INTEGER, |
|
long_text_mode BOOLEAN, |
|
dimensionalität INTEGER)''') |
|
|
|
for subdir, _, files in os.walk(root_dir): |
|
for file in files: |
|
file_path = os.path.join(subdir, file) |
|
if file.endswith('.html'): |
|
parameter = extrahiere_parameter_html(file_path) |
|
elif file.endswith('.xml'): |
|
parameter = extrahiere_parameter_xml(file_path) |
|
elif file.endswith('.rtf'): |
|
parameter = extrahiere_parameter_rtf(file_path) |
|
elif any(file.endswith(ext) for ext in SUPPORTED_FILE_TYPES): |
|
parameter = extrahiere_parameter(file_path) |
|
else: |
|
continue |
|
|
|
if parameter: |
|
cursor.execute('''INSERT INTO dateiparameter (dateipfad, anzahl_zeilen, anzahl_zeichen, long_text_mode, dimensionalität) |
|
VALUES (?, ?, ?, ?, ?)''', (file_path, parameter["anzahl_zeilen"], parameter["anzahl_zeichen"], parameter["long_text_mode"], parameter["dimensionalität"])) |
|
conn.commit() |
|
print("Parameter erfolgreich extrahiert und in der Datenbank gespeichert.") |
|
except sqlite3.Error as e: |
|
print(f"SQLite Fehler: {e}") |
|
except Exception as e: |
|
print(f"Allgemeiner Fehler: {e}") |
|
|
|
def extrahiere_parameter_aus_db(db_pfad): |
|
try: |
|
with sqlite3.connect(db_pfad) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT * FROM dateiparameter") |
|
daten = cursor.fetchall() |
|
return daten |
|
except sqlite3.Error as e: |
|
print(f"SQLite Fehler: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"Allgemeiner Fehler: {e}") |
|
return None |
|
|
|
def konvertiere_zu_hf_dataset(daten): |
|
dataset_dict = { |
|
"text": [], |
|
"anzahl_zeilen": [], |
|
"anzahl_zeichen": [], |
|
"long_text_mode": [], |
|
"dimensionalität": [] |
|
} |
|
|
|
for eintrag in daten: |
|
dataset_dict["text"].append(eintrag[1]) |
|
dataset_dict["anzahl_zeilen"].append(eintrag[2]) |
|
dataset_dict["anzahl_zeichen"].append(eintrag[3]) |
|
dataset_dict["long_text_mode"].append(eintrag[4]) |
|
dataset_dict["dimensionalität"].append(eintrag[5]) |
|
|
|
return Dataset.from_dict(dataset_dict) |
|
|
|
def trainiere_und_speichere_modell(hf_dataset, output_model_dir): |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) |
|
|
|
def tokenize_function(examples): |
|
return tokenizer(examples["text"], padding="max_length", truncation=True) |
|
|
|
tokenized_datasets = hf_dataset.map(tokenize_function, batched=True) |
|
|
|
|
|
tokenized_datasets = tokenized_datasets.map(lambda examples: {"label": [0.0] * len(examples["text"])}, batched=True) |
|
|
|
|
|
train_test_split = tokenized_datasets.train_test_split(test_size=0.2) |
|
train_dataset = train_test_split["train"] |
|
eval_dataset = train_test_split["test"] |
|
|
|
num_labels = len(set(train_dataset["label"])) |
|
|
|
|
|
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
|
|
|
training_args = TrainingArguments( |
|
output_dir=output_model_dir, |
|
evaluation_strategy="epoch", |
|
per_device_train_batch_size=8, |
|
per_device_eval_batch_size=8, |
|
num_train_epochs=3, |
|
weight_decay=0.01, |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
args=training_args, |
|
train_dataset=train_dataset, |
|
eval_dataset=eval_dataset, |
|
) |
|
|
|
trainer.train() |
|
model.save_pretrained(output_model_dir) |
|
tokenizer.save_pretrained(output_model_dir) |
|
|
|
|
|
tf_model = TFAutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels) |
|
tf_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
|
|
|
|
import tensorflow as tf |
|
dummy_input = tf.constant(tokenizer("This is a dummy input", return_tensors="tf")["input_ids"]) |
|
|
|
|
|
tf_model(dummy_input) |
|
tf_model.save_pretrained(output_model_dir) |
|
|
|
print(f"Das Modell wurde erfolgreich in {output_model_dir} gespeichert.") |
|
|
|
except Exception as e: |
|
print(f"Fehler beim Trainieren und Speichern des Modells: {e}") |
|
|
|
if __name__ == "__main__": |
|
|
|
if len(sys.argv) > 1: |
|
directory_path = sys.argv[1] |
|
else: |
|
directory_path = '.' |
|
|
|
db_name = os.path.basename(os.path.normpath(directory_path)) + '.db' |
|
|
|
durchsuchen_und_extrahieren(directory_path, db_name) |
|
|
|
daten = extrahiere_parameter_aus_db(db_name) |
|
if daten: |
|
hf_dataset = konvertiere_zu_hf_dataset(daten) |
|
|
|
output_model = os.path.basename(os.path.normpath(directory_path)) + '_model' |
|
output_model_dir = os.path.join(os.path.dirname(db_name), output_model) |
|
|
|
trainiere_und_speichere_modell(hf_dataset, output_model_dir) |
|
else: |
|
print("Keine Daten gefunden, um ein HF-Dataset zu erstellen.") |
|
|