Demo / Gomoku_MCTS /policy_value_net_pytorch.py
HuskyDoge's picture
trial
172a1e4
raw
history blame
6.01 kB
# -*- coding: utf-8 -*-
"""
An implementation of the policyValueNet in PyTorch
Tested in PyTorch 0.2.0 and 0.3.0
@author: Junxiao Song
"""
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
class Net(nn.Module):
"""policy-value network module"""
def __init__(self, board_width, board_height):
super(Net, self).__init__()
self.board_width = board_width
self.board_height = board_height
# common layers
self.conv1 = nn.Conv2d(4, 32, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
# action policy layers
self.act_conv1 = nn.Conv2d(128, 4, kernel_size=1)
self.act_fc1 = nn.Linear(4*board_width*board_height,
board_width*board_height)
# state value layers
self.val_conv1 = nn.Conv2d(128, 2, kernel_size=1)
self.val_fc1 = nn.Linear(2*board_width*board_height, 64)
self.val_fc2 = nn.Linear(64, 1)
def forward(self, state_input):
# common layers
x = F.relu(self.conv1(state_input))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
# action policy layers
x_act = F.relu(self.act_conv1(x))
x_act = x_act.view(-1, 4*self.board_width*self.board_height)
x_act = F.log_softmax(self.act_fc1(x_act))
# state value layers
x_val = F.relu(self.val_conv1(x))
x_val = x_val.view(-1, 2*self.board_width*self.board_height)
x_val = F.relu(self.val_fc1(x_val))
x_val = F.tanh(self.val_fc2(x_val))
return x_act, x_val
class PolicyValueNet():
"""policy-value network """
def __init__(self, board_width, board_height,
model_file=None, use_gpu=False):
self.use_gpu = use_gpu
self.board_width = board_width
self.board_height = board_height
# the policy value net module
if self.use_gpu:
self.policy_value_net = Net(board_width, board_height).cuda()
else:
self.policy_value_net = Net(board_width, board_height)
if model_file:
net_params = torch.load(model_file)
self.policy_value_net.load_state_dict(net_params)
def policy_value(self, state_batch):
"""
input: a batch of states
output: a batch of action probabilities and state values
"""
if self.use_gpu:
state_batch = Variable(torch.FloatTensor(state_batch).cuda())
log_act_probs, value = self.policy_value_net(state_batch)
act_probs = np.exp(log_act_probs.data.cpu().numpy())
return act_probs, value.data.cpu().numpy()
else:
state_batch = Variable(torch.FloatTensor(state_batch))
log_act_probs, value = self.policy_value_net(state_batch)
act_probs = np.exp(log_act_probs.data.numpy())
return act_probs, value.data.numpy()
def policy_value_fn(self, board):
"""
input: board
output: a list of (action, probability) tuples for each available
action and the score of the board state
"""
legal_positions = board.availables
current_state = np.ascontiguousarray(board.current_state().reshape(
-1, 4, self.board_width, self.board_height))
if self.use_gpu:
log_act_probs, value = self.policy_value_net(
Variable(torch.from_numpy(current_state)).cuda().float())
act_probs = np.exp(log_act_probs.data.cpu().numpy().flatten())
else:
log_act_probs, value = self.policy_value_net(
Variable(torch.from_numpy(current_state)).float())
act_probs = np.exp(log_act_probs.data.numpy().flatten())
act_probs = zip(legal_positions, act_probs[legal_positions])
value = value.data[0][0]
return act_probs, value
# 搬到main_worker
def train_step(self, state_batch, mcts_probs, winner_batch, lr):
"""perform a training step"""
# self.use_gpu = True
# wrap in Variable
if self.use_gpu:
state_batch = Variable(torch.FloatTensor(state_batch).cuda())
mcts_probs = Variable(torch.FloatTensor(mcts_probs).cuda())
winner_batch = Variable(torch.FloatTensor(winner_batch).cuda())
else:
state_batch = Variable(torch.FloatTensor(state_batch))
mcts_probs = Variable(torch.FloatTensor(mcts_probs))
winner_batch = Variable(torch.FloatTensor(winner_batch))
# zero the parameter gradients
self.optimizer.zero_grad()
# set learning rate
set_learning_rate(self.optimizer, lr)
# forward
log_act_probs, value = self.policy_value_net(state_batch)
# define the loss = (z - v)^2 - pi^T * log(p) + c||theta||^2
# Note: the L2 penalty is incorporated in optimizer
value_loss = F.mse_loss(value.view(-1), winner_batch)
policy_loss = -torch.mean(torch.sum(mcts_probs*log_act_probs, 1))
loss = value_loss + policy_loss
# backward and optimize
loss.backward()
self.optimizer.step()
# calc policy entropy, for monitoring only
entropy = -torch.mean(
torch.sum(torch.exp(log_act_probs) * log_act_probs, 1)
)
# return loss.data[0], entropy.data[0]
#for pytorch version >= 0.5 please use the following line instead.
return loss.item(), entropy.item()
# def get_policy_param(self):
# net_params = self.policy_value_net.state_dict()
# return net_params
# def save_model(self, model_file):
# """ save model params to file """
# net_params = self.get_policy_param() # get model params
# torch.save(net_params, model_file)