Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
PPO RLHF训练脚本 - 基于Teacher模型进行人类偏好对齐 | |
输入: SFT Teacher模型 + 人类偏好数据 | |
输出: RLHF对齐的Teacher模型 | |
""" | |
import os | |
import torch | |
import torch.nn.functional as F | |
from datasets import load_dataset, Dataset | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
TrainingArguments, | |
pipeline, | |
logging, | |
) | |
from peft import PeftModel, LoraConfig, get_peft_model, TaskType | |
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead | |
import wandb | |
import numpy as np | |
from typing import List, Dict, Any | |
import warnings | |
warnings.filterwarnings("ignore") | |
logging.set_verbosity(logging.CRITICAL) | |
class RLHFConfig: | |
"""RLHF训练配置""" | |
# 模型路径 | |
teacher_model_path = "./merged_model" # 之前SFT训练的Teacher模型 | |
reward_model_name = "OpenAssistant/reward-model-deberta-v3-large-v2" # 奖励模型 | |
# PPO训练参数 | |
learning_rate = 1e-5 | |
mini_batch_size = 1 | |
batch_size = 8 | |
gradient_accumulation_steps = 8 | |
ppo_epochs = 4 | |
max_grad_norm = 1.0 | |
# PPO特定参数 | |
init_kl_coef = 0.02 | |
target_kl = 0.01 | |
adap_kl_ctrl = True | |
clip_reward_value = 5.0 | |
cliprange = 0.2 | |
cliprange_value = 0.2 | |
gamma = 1.0 | |
lam = 0.95 | |
# 生成参数 | |
max_new_tokens = 150 | |
temperature = 0.7 | |
top_p = 0.9 | |
do_sample = True | |
# 训练控制 | |
total_episodes = 1000 | |
save_freq = 100 | |
eval_freq = 50 | |
output_dir = "./rlhf_teacher_model" | |
# LoRA参数(如果使用LoRA进行RLHF) | |
use_lora = True | |
lora_r = 16 | |
lora_alpha = 32 | |
lora_dropout = 0.1 | |
class RewardModelWrapper: | |
"""奖励模型包装器""" | |
def __init__(self, model_name: str, device: str = "cuda"): | |
self.device = device | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModelForSequenceClassification.from_pretrained( | |
model_name, | |
torch_dtype=torch.float16, | |
device_map="auto" | |
) | |
self.model.eval() | |
# 设置pad token | |
if self.tokenizer.pad_token is None: | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
def get_reward(self, prompts: List[str], responses: List[str]) -> List[float]: | |
"""计算奖励分数""" | |
inputs = [] | |
for prompt, response in zip(prompts, responses): | |
# 格式化为对话格式 | |
text = f"Human: {prompt}\n\nAssistant: {response}" | |
inputs.append(text) | |
# 批量推理 | |
with torch.no_grad(): | |
encoded = self.tokenizer( | |
inputs, | |
padding=True, | |
truncation=True, | |
max_length=512, | |
return_tensors="pt" | |
).to(self.device) | |
outputs = self.model(**encoded) | |
rewards = outputs.logits.squeeze(-1).cpu().tolist() | |
return rewards | |
def load_preference_dataset(): | |
"""加载偏好数据集""" | |
print("📥 Loading preference dataset...") | |
# 可以使用多个数据源 | |
datasets_config = [ | |
{ | |
"name": "Anthropic/hh-rlhf", | |
"split": "train", | |
"weight": 0.7 | |
}, | |
{ | |
"name": "OpenAssistant/oasst1", | |
"split": "train", | |
"weight": 0.3 | |
} | |
] | |
all_prompts = [] | |
for config in datasets_config: | |
try: | |
dataset = load_dataset(config["name"], split=config["split"]) | |
# 处理不同数据集格式 | |
if config["name"] == "Anthropic/hh-rlhf": | |
prompts = extract_prompts_from_hh(dataset) | |
else: | |
prompts = extract_prompts_from_oasst(dataset) | |
# 按权重采样 | |
sample_size = int(len(prompts) * config["weight"]) | |
prompts = prompts[:sample_size] | |
all_prompts.extend(prompts) | |
print(f"✅ Loaded {len(prompts)} prompts from {config['name']}") | |
except Exception as e: | |
print(f"⚠️ Failed to load {config['name']}: {e}") | |
# 创建Dataset对象 | |
return Dataset.from_dict({"prompt": all_prompts}) | |
def extract_prompts_from_hh(dataset): | |
"""从HH-RLHF数据集提取提示""" | |
prompts = [] | |
for item in dataset: | |
# HH-RLHF格式解析 | |
text = item.get("chosen", "") | |
if "Human:" in text: | |
prompt = text.split("Human:")[-1].split("Assistant:")[0].strip() | |
if len(prompt) > 10: # 过滤太短的提示 | |
prompts.append(prompt) | |
return prompts | |
def extract_prompts_from_oasst(dataset): | |
"""从OpenAssistant数据集提取提示""" | |
prompts = [] | |
for item in dataset: | |
if item.get("role") == "prompter": | |
prompt = item.get("text", "").strip() | |
if len(prompt) > 10: | |
prompts.append(prompt) | |
return prompts | |
def prepare_teacher_model(config: RLHFConfig): | |
"""准备Teacher模型用于RLHF""" | |
print("🤖 Preparing teacher model for RLHF...") | |
# 加载tokenizer | |
tokenizer = AutoTokenizer.from_pretrained(config.teacher_model_path) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
# 加载基础模型 | |
model = AutoModelForCausalLM.from_pretrained( | |
config.teacher_model_path, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
trust_remote_code=True, | |
) | |
# 如果使用LoRA进行RLHF | |
if config.use_lora: | |
print("🔧 Adding LoRA for RLHF training...") | |
lora_config = LoraConfig( | |
task_type=TaskType.CAUSAL_LM, | |
inference_mode=False, | |
r=config.lora_r, | |
lora_alpha=config.lora_alpha, | |
lora_dropout=config.lora_dropout, | |
target_modules=[ | |
"q_proj", "k_proj", "v_proj", "o_proj", | |
"gate_proj", "up_proj", "down_proj", | |
] | |
) | |
model = get_peft_model(model, lora_config) | |
model.print_trainable_parameters() | |
# 包装为带价值头的模型 | |
model = AutoModelForCausalLMWithValueHead.from_pretrained( | |
model, | |
torch_dtype=torch.float16, | |
) | |
# 创建参考模型(冻结) | |
ref_model = AutoModelForCausalLM.from_pretrained( | |
config.teacher_model_path, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
) | |
ref_model.eval() | |
return model, ref_model, tokenizer | |
def create_ppo_trainer(model, ref_model, tokenizer, config: RLHFConfig): | |
"""创建PPO训练器""" | |
print("🏋️ Creating PPO trainer...") | |
ppo_config = PPOConfig( | |
model_name=config.teacher_model_path, | |
learning_rate=config.learning_rate, | |
mini_batch_size=config.mini_batch_size, | |
batch_size=config.batch_size, | |
gradient_accumulation_steps=config.gradient_accumulation_steps, | |
ppo_epochs=config.ppo_epochs, | |
max_grad_norm=config.max_grad_norm, | |
init_kl_coef=config.init_kl_coef, | |
target_kl=config.target_kl, | |
adap_kl_ctrl=config.adap_kl_ctrl, | |
clip_reward_value=config.clip_reward_value, | |
cliprange=config.cliprange, | |
cliprange_value=config.cliprange_value, | |
gamma=config.gamma, | |
lam=config.lam, | |
remove_unused_columns=False, | |
log_with="wandb" if wandb.run else None, | |
) | |
trainer = PPOTrainer( | |
config=ppo_config, | |
model=model, | |
ref_model=ref_model, | |
tokenizer=tokenizer, | |
) | |
return trainer | |
def format_prompt_for_generation(prompt: str) -> str: | |
"""格式化提示用于生成""" | |
return f"### Human: {prompt}\n### Assistant:" | |
def run_ppo_training(): | |
"""主要的PPO训练循环""" | |
print("🚀 Starting PPO RLHF Training...") | |
# 初始化wandb | |
wandb.init( | |
project="rlhf-teacher-training", | |
config=vars(RLHFConfig), | |
name="ppo-teacher-rlhf" | |
) | |
config = RLHFConfig() | |
# 准备模型 | |
model, ref_model, tokenizer = prepare_teacher_model(config) | |
# 创建PPO训练器 | |
ppo_trainer = create_ppo_trainer(model, ref_model, tokenizer, config) | |
# 加载奖励模型 | |
reward_model = RewardModelWrapper(config.reward_model_name) | |
# 加载数据集 | |
dataset = load_preference_dataset() | |
print(f"📊 Training on {len(dataset)} prompts") | |
print(f"🎯 Target episodes: {config.total_episodes}") | |
# 训练循环 | |
for episode in range(config.total_episodes): | |
# 随机采样prompts | |
batch_prompts = np.random.choice( | |
dataset["prompt"], | |
size=config.batch_size, | |
replace=False | |
).tolist() | |
# 格式化输入 | |
formatted_prompts = [format_prompt_for_generation(p) for p in batch_prompts] | |
# 生成响应 | |
prompt_tensors = [] | |
for prompt in formatted_prompts: | |
prompt_tensor = tokenizer.encode( | |
prompt, | |
return_tensors="pt", | |
padding=False, | |
truncation=True, | |
max_length=256 | |
).squeeze() | |
prompt_tensors.append(prompt_tensor) | |
# 批量生成 | |
response_tensors = [] | |
with torch.no_grad(): | |
for prompt_tensor in prompt_tensors: | |
prompt_tensor = prompt_tensor.unsqueeze(0).to(model.device) | |
response = ppo_trainer.generate( | |
prompt_tensor, | |
max_new_tokens=config.max_new_tokens, | |
temperature=config.temperature, | |
top_p=config.top_p, | |
do_sample=config.do_sample, | |
pad_token_id=tokenizer.eos_token_id, | |
) | |
# 只保留新生成的部分 | |
response = response.squeeze()[prompt_tensor.shape[1]:] | |
response_tensors.append(response) | |
# 解码响应 | |
responses = [ | |
tokenizer.decode(r, skip_special_tokens=True).strip() | |
for r in response_tensors | |
] | |
# 计算奖励 | |
rewards = reward_model.get_reward(batch_prompts, responses) | |
rewards = [torch.tensor(r, dtype=torch.float) for r in rewards] | |
# PPO训练步骤 | |
stats = ppo_trainer.step(prompt_tensors, response_tensors, rewards) | |
# 记录统计信息 | |
ppo_trainer.log_stats( | |
stats, | |
batch_prompts, | |
[list(p) + list(r) for p, r in zip(prompt_tensors, response_tensors)], | |
rewards | |
) | |
# 打印进度 | |
if episode % 10 == 0: | |
mean_reward = np.mean([r.item() for r in rewards]) | |
print(f"📈 Episode {episode}: Mean Reward = {mean_reward:.4f}") | |
# 记录到wandb | |
wandb.log({ | |
"episode": episode, | |
"mean_reward": mean_reward, | |
"kl_divergence": stats.get("objective/kl", 0), | |
"policy_loss": stats.get("ppo/loss/policy", 0), | |
"value_loss": stats.get("ppo/loss/value", 0), | |
}) | |
# 评估模型 | |
if episode % config.eval_freq == 0 and episode > 0: | |
evaluate_model(ppo_trainer.model, tokenizer, episode) | |
# 保存检查点 | |
if episode % config.save_freq == 0 and episode > 0: | |
save_checkpoint(ppo_trainer.model, tokenizer, config.output_dir, episode) | |
# 保存最终模型 | |
print("💾 Saving final RLHF model...") | |
ppo_trainer.model.save_pretrained(config.output_dir) | |
tokenizer.save_pretrained(config.output_dir) | |
wandb.finish() | |
print("✅ RLHF training completed!") | |
def evaluate_model(model, tokenizer, episode): | |
"""评估模型性能""" | |
print(f"🧪 Evaluating model at episode {episode}...") | |
test_prompts = [ | |
"Create an advertisement for a revolutionary smartphone with AI capabilities", | |
"Write marketing copy for an eco-friendly clothing brand", | |
"Generate a slogan for a fitness app targeting busy professionals", | |
] | |
model.eval() | |
results = [] | |
for prompt in test_prompts: | |
formatted_prompt = format_prompt_for_generation(prompt) | |
inputs = tokenizer(formatted_prompt, return_tensors="pt").to(model.device) | |
with torch.no_grad(): | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=150, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id, | |
) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
generated_text = response[len(formatted_prompt):].strip() | |
results.append({ | |
"prompt": prompt, | |
"response": generated_text | |
}) | |
print(f"🔍 Prompt: {prompt}") | |
print(f"📝 Response: {generated_text}") | |
print("-" * 80) | |
model.train() | |
return results | |
def save_checkpoint(model, tokenizer, output_dir, episode): | |
"""保存训练检查点""" | |
checkpoint_dir = f"{output_dir}/checkpoint-{episode}" | |
os.makedirs(checkpoint_dir, exist_ok=True) | |
model.save_pretrained(checkpoint_dir) | |
tokenizer.save_pretrained(checkpoint_dir) | |
print(f"💾 Checkpoint saved to {checkpoint_dir}") | |
def load_checkpoint_and_continue(checkpoint_path): | |
"""从检查点继续训练""" | |
print(f"📥 Loading checkpoint from {checkpoint_path}") | |
# 实现检查点恢复逻辑 | |
pass | |
if __name__ == "__main__": | |
# 设置环境变量 | |
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" # 多GPU设置 | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# 检查GPU资源 | |
if torch.cuda.is_available(): | |
print(f"🔥 Using {torch.cuda.device_count()} GPUs") | |
for i in range(torch.cuda.device_count()): | |
print(f" GPU {i}: {torch.cuda.get_device_name(i)}") | |
else: | |
raise RuntimeError("❌ CUDA not available! RLHF requires GPU.") | |
# 开始训练 | |
run_ppo_training() |