""" - Copyright (c) 2025 DuYu (No.202103180009, qluduyu09@163.com), Faculty of Computer Science and Technology, Qilu University of Technology (Shandong Academy of Sciences). - 基于Transformer的汉语拼音序列转汉字序列模型 训练与测试代码 - 文件名:run.py """ import re import warnings import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from collections import Counter warnings.filterwarnings("ignore") # 全局禁用警告信息,开发时可去除 # 设置随机种子保证可重复性 torch.manual_seed(525200) np.random.seed(40004004) # 检查是否有可用的GPU device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # 1. 数据读取与预处理 class PinyinHanziDataset(Dataset): def __init__(self, csv_file, max_length=15): self.data = pd.read_csv(csv_file, header=None, names=['hanzi', 'pinyin']) self.max_length = max_length # 构建词汇表 self._build_vocab() def _tokenize_hanzi(self, s): """将文本分割为汉字、英文单词和标点符号的混合token""" pattern = re.compile( r'([\u4e00-\u9fff\u3000-\u303f\uff00-\uffef]|[a-zA-Z.,!?;:\'"]+|\d+|\s)' ) tokens = [] for token in pattern.finditer(s): if token.group().strip(): # 忽略纯空格 tokens.append(token.group()) return tokens def _build_vocab(self): # 处理汉字词汇表 hanzi_counter = Counter() pinyin_counter = Counter() for _, row in self.data.iterrows(): # 使用新的tokenize方法处理汉字 hanzi_tokens = self._tokenize_hanzi(row['hanzi']) hanzi_counter.update(hanzi_tokens) # 拼音处理:按空格分割 pinyin_tokens = row['pinyin'].split() pinyin_counter.update(pinyin_tokens) # 添加特殊token self.hanzi_vocab = ['', '', '', ''] + [char for char, _ in hanzi_counter.most_common()] self.pinyin_vocab = ['', '', '', ''] + [pinyin for pinyin, _ in pinyin_counter.most_common()] # 创建token到id的映射 self.hanzi2idx = {char: idx for idx, char in enumerate(self.hanzi_vocab)} self.idx2hanzi = {idx: char for idx, char in enumerate(self.hanzi_vocab)} self.pinyin2idx = {pinyin: idx for idx, pinyin in enumerate(self.pinyin_vocab)} self.idx2pinyin = {idx: pinyin for idx, pinyin in enumerate(self.pinyin_vocab)} def __len__(self): return len(self.data) def __getitem__(self, idx): hanzi_seq = self.data.iloc[idx]['hanzi'] pinyin_seq = self.data.iloc[idx]['pinyin'] # 将汉字序列转换为token id序列 hanzi_tokens = [''] + self._tokenize_hanzi(hanzi_seq) + [''] hanzi_ids = [self.hanzi2idx.get(token, self.hanzi2idx['']) for token in hanzi_tokens] # 将拼音序列转换为token id序列 pinyin_tokens = [''] + pinyin_seq.split() + [''] pinyin_ids = [self.pinyin2idx.get(token, self.pinyin2idx['']) for token in pinyin_tokens] # 截断或填充序列 hanzi_ids = hanzi_ids[:self.max_length] pinyin_ids = pinyin_ids[:self.max_length] hanzi_padding = [self.hanzi2idx['']] * (self.max_length - len(hanzi_ids)) pinyin_padding = [self.pinyin2idx['']] * (self.max_length - len(pinyin_ids)) hanzi_ids += hanzi_padding pinyin_ids += pinyin_padding return { 'pinyin': torch.tensor(pinyin_ids, dtype=torch.long), 'hanzi': torch.tensor(hanzi_ids, dtype=torch.long), 'hanzi_input': torch.tensor(hanzi_ids[:-1], dtype=torch.long), 'hanzi_target': torch.tensor(hanzi_ids[1:], dtype=torch.long) } # 2. Transformer模型定义 class TransformerModel(nn.Module): def __init__(self, pinyin_vocab_size, hanzi_vocab_size, d_model=256, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=1024, dropout=0.075): super(TransformerModel, self).__init__() self.d_model = d_model # 拼音嵌入层 self.pinyin_embedding = nn.Embedding(pinyin_vocab_size, d_model) # 汉字嵌入层 self.hanzi_embedding = nn.Embedding(hanzi_vocab_size, d_model) # 位置编码 self.positional_encoding = PositionalEncoding(d_model, dropout) # Transformer模型 self.transformer = nn.Transformer( d_model=d_model, nhead=nhead, num_encoder_layers=num_encoder_layers, num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward, dropout=dropout ) # 输出层 self.fc_out = nn.Linear(d_model, hanzi_vocab_size) def forward(self, pinyin, hanzi_input): # 嵌入层 pinyin_embedded = self.pinyin_embedding(pinyin) * np.sqrt(self.d_model) hanzi_embedded = self.hanzi_embedding(hanzi_input) * np.sqrt(self.d_model) # 位置编码 pinyin_embedded = self.positional_encoding(pinyin_embedded) hanzi_embedded = self.positional_encoding(hanzi_embedded) # 调整维度顺序:(seq_len, batch_size, d_model) pinyin_embedded = pinyin_embedded.permute(1, 0, 2) hanzi_embedded = hanzi_embedded.permute(1, 0, 2) # 创建mask src_mask = self._generate_square_subsequent_mask(pinyin_embedded.size(0)).to(device) tgt_mask = self._generate_square_subsequent_mask(hanzi_embedded.size(0)).to(device) # Transformer前向传播 output = self.transformer( src=pinyin_embedded, tgt=hanzi_embedded, src_key_padding_mask=self._create_padding_mask(pinyin), tgt_key_padding_mask=self._create_padding_mask(hanzi_input), memory_key_padding_mask=self._create_padding_mask(pinyin), src_mask=src_mask, tgt_mask=tgt_mask ) # 输出层,输出前将维度调整回(batch_size, seq_len, d_model) output = output.permute(1, 0, 2) output = self.fc_out(output) return output def _generate_square_subsequent_mask(self, sz): return torch.triu(torch.full((sz, sz), float('-inf')), diagonal=1) def _create_padding_mask(self, seq): return seq == 0 # 假设的id是0 # 3. 位置编码定义 class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=512): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) position = torch.arange(max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model)) pe = torch.zeros(max_len, 1, d_model) pe[:, 0, 0::2] = torch.sin(position * div_term) pe[:, 0, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0)] return self.dropout(x) # 4. 建模(包装器定义) class PinyinHanziTransformer: def __init__(self, model=None, dataset=None, config=None): self.model = model self.dataset = dataset self.config = config or {} def save(self, filepath): """保存整个模型、词汇表和配置到单个文件""" save_data = { 'model_state_dict': self.model.state_dict(), 'hanzi_vocab': self.dataset.hanzi_vocab, 'pinyin_vocab': self.dataset.pinyin_vocab, 'hanzi2idx': self.dataset.hanzi2idx, 'idx2hanzi': self.dataset.idx2hanzi, 'pinyin2idx': self.dataset.pinyin2idx, 'idx2pinyin': self.dataset.idx2pinyin, 'max_length': self.dataset.max_length, 'config': self.config } torch.save(save_data, filepath) @classmethod def load(cls, filepath, device='cpu'): """从文件加载整个模型""" save_data = torch.load(filepath, map_location=device) # 创建虚拟数据集对象以保存词汇表信息 class DummyDataset: pass dataset = DummyDataset() dataset.hanzi_vocab = save_data['hanzi_vocab'] dataset.pinyin_vocab = save_data['pinyin_vocab'] dataset.hanzi2idx = save_data['hanzi2idx'] dataset.idx2hanzi = save_data['idx2hanzi'] dataset.pinyin2idx = save_data['pinyin2idx'] dataset.idx2pinyin = save_data['idx2pinyin'] dataset.max_length = save_data['max_length'] # 初始化模型 config = save_data['config'] model = TransformerModel( pinyin_vocab_size=len(dataset.pinyin_vocab), hanzi_vocab_size=len(dataset.hanzi_vocab), **config ).to(device) model.load_state_dict(save_data['model_state_dict']) return cls(model=model, dataset=dataset, config=config) @staticmethod def top_k_sampling(logits, k=5, temperature=1.0): logits = logits / temperature probs = F.softmax(logits, dim=-1) # shape: (1, vocab_size) topk_probs, topk_indices = torch.topk(probs, k, dim=-1) # shape: (1, k) # 从 top-k 中随机采样一个 index(在 top k 里的位置) sampled_index = torch.multinomial(topk_probs, num_samples=1) # shape: (1, 1) # 找到对应的真正 vocab 索引 next_token = torch.gather(topk_indices, dim=1, index=sampled_index) # shape: (1, 1) # Instead of directly using .item(), ensure we're handling the tensor correctly return next_token.squeeze().item() # .squeeze() to get rid of the extra dimension and then .item() def predict(self, pinyin_seq, max_length=None, k=3, temperature=1.0): """预测函数(使用top-k采样)""" self.model.eval() max_length = max_length or self.dataset.max_length # 拼音转ID pinyin_tokens = [''] + pinyin_seq.split() + [''] pinyin_ids = [self.dataset.pinyin2idx.get(token, self.dataset.pinyin2idx['']) for token in pinyin_tokens] pinyin_ids = pinyin_ids[:max_length] pinyin_ids += [self.dataset.pinyin2idx['']] * (max_length - len(pinyin_ids)) pinyin_tensor = torch.tensor(pinyin_ids, dtype=torch.long).unsqueeze(0).to(self.model.device) # 初始化汉字序列 hanzi_ids = [self.dataset.hanzi2idx['']] for i in range(max_length - 1): hanzi_tensor = torch.tensor(hanzi_ids, dtype=torch.long).unsqueeze(0).to(self.model.device) with torch.no_grad(): output = self.model(pinyin_tensor, hanzi_tensor) # (1, seq_len, vocab_size) logits = output[:, -1, :] # 取最后一个位置的logits,(1, vocab_size) # 使用top-k采样 next_token = PinyinHanziTransformer.top_k_sampling(logits, k=k, temperature=temperature) hanzi_ids.append(next_token) if next_token == self.dataset.hanzi2idx['']: break # 转换为汉字序列 hanzi_seq = [self.dataset.idx2hanzi[idx] for idx in hanzi_ids[1:-1]] # 去掉和可能的 return ''.join(hanzi_seq) # 在TransformerModel类中添加device属性 @property def device(self): return next(self.parameters()).device TransformerModel.device = device # 5. 训练函数定义 def train_model(model, dataloader, optimizer, criterion, epoch): model.train() total_loss = 0 progress_bar = tqdm(dataloader, desc=f"Epoch {epoch}") for batch in progress_bar: pinyin = batch['pinyin'].to(device) hanzi_input = batch['hanzi_input'].to(device) hanzi_target = batch['hanzi_target'].to(device) # 前向传播 output = model(pinyin, hanzi_input) # 计算损失 loss = criterion(output.reshape(-1, output.size(-1)), hanzi_target.reshape(-1)) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() progress_bar.set_postfix(loss=f"{loss.item():.3f}") return total_loss / len(dataloader) # 4. 评估函数定义 def evaluate_model(model, dataloader, criterion): model.eval() total_loss = 0 with torch.no_grad(): for batch in tqdm(dataloader, desc="Evaluating"): pinyin = batch['pinyin'].to(device) hanzi_input = batch['hanzi_input'].to(device) hanzi_target = batch['hanzi_target'].to(device) output = model(pinyin, hanzi_input) loss = criterion(output.reshape(-1, output.size(-1)), hanzi_target.reshape(-1)) total_loss += loss.item() return total_loss / len(dataloader) # 6. 模型训练主函数 def train_main(): # 参数设置 训练前请调整这些参数 batch_size = 256 # 批大小 num_epochs = 33 # 迭代轮数 learning_rate = 0.0001 # 学习率 max_length = 14 # 截断长度 train_test_ratio = 0.95 # 数据集中训练集与测试集数据量比例 dataset_filepath = 'pinyin2hanzi.csv' # 数据集CSV文件路径 model_config = { # 模型配置参数 'd_model': 512, # 词嵌入维度 'nhead': 16, # 多头注意力层注意力头数 'num_encoder_layers': 8, # Transformer编码器块数 'num_decoder_layers': 6, # Transformer解码器块数 'dim_feedforward': 1024, # Transformer前馈层维度 'dropout': 0.07 # dropout比例 } # 加载数据集 dataset = PinyinHanziDataset(dataset_filepath, max_length=max_length) # 分割训练集和测试集 train_size = int(train_test_ratio * len(dataset)) test_size = len(dataset) - train_size train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size]) # 创建DataLoader train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # 初始化模型包装器 transformer = PinyinHanziTransformer( model=TransformerModel( pinyin_vocab_size=len(dataset.pinyin_vocab), hanzi_vocab_size=len(dataset.hanzi_vocab), **model_config ).to(device), dataset=dataset, config=model_config ) # 损失函数和优化器 criterion = nn.CrossEntropyLoss(ignore_index=dataset.hanzi2idx['']) optimizer = optim.Adam(transformer.model.parameters(), lr=learning_rate) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=45, gamma=0.41) # 训练循环 train_losses = [] test_losses = [] for epoch in range(1, num_epochs + 1): train_loss = train_model(transformer.model, train_loader, optimizer, criterion, epoch) test_loss = evaluate_model(transformer.model, test_loader, criterion) train_losses.append(train_loss) test_losses.append(test_loss) scheduler.step() print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Test Loss = {test_loss:.4f}") # 保存整个模型到当前目录(包括词汇表等信息) # if epoch % 7 == 0 or epoch == num_epochs: transformer.save(f"pinyin2hanzi_transformer_epoch{epoch}.pth") # 绘制损失曲线 plt.plot(train_losses, label='Train Loss') plt.plot(test_losses, label='Test Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.savefig('loss_curve.png') # 7. 模型推理主函数 def use_main(): transformer = PinyinHanziTransformer.load("pinyin2hanzi_transformer.pth", device=str(device)) result = transformer.predict("hong2 yan2 bo2 ming4") # 应当输出:红颜薄命 print("预测结果: ", result) if __name__ == "__main__": # train_main() # 解除注释、修改参数,运行代码以开始训练 use_main() # 解除注释以使用模型