# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import parl
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import paddle.optimizer as optim
from paddle.distribution import Normal, Categorical
from parl.utils.utils import check_model_method
__all__ = ['PPO']
[docs]class PPO(parl.Algorithm):
[docs] def __init__(self,
model,
clip_param=0.1,
value_loss_coef=0.5,
entropy_coef=0.01,
initial_lr=2.5e-4,
eps=1e-5,
max_grad_norm=0.5,
use_clipped_value_loss=True,
norm_adv=True,
continuous_action=False):
""" PPO algorithm
Args:
model (parl.Model): forward network of actor and critic.
clip_param (float): epsilon in clipping loss.
value_loss_coef (float): value function loss coefficient in the optimization objective.
entropy_coef (float): policy entropy coefficient in the optimization objective.
initial_lr (float): learning rate.
eps (float): Adam optimizer epsilon.
max_grad_norm (float): max gradient norm for gradient clipping.
use_clipped_value_loss (bool): whether or not to use a clipped loss for the value function.
norm_adv (bool): whether or not to use advantages normalization.
continuous_action (bool): whether or not is continuous action environment.
"""
# check model methods
check_model_method(model, 'value', self.__class__.__name__)
check_model_method(model, 'policy', self.__class__.__name__)
assert isinstance(clip_param, float)
assert isinstance(value_loss_coef, float)
assert isinstance(entropy_coef, float)
assert isinstance(initial_lr, float)
assert isinstance(eps, float)
assert isinstance(max_grad_norm, float)
assert isinstance(use_clipped_value_loss, bool)
assert isinstance(norm_adv, bool)
assert isinstance(continuous_action, bool)
self.clip_param = clip_param
self.value_loss_coef = value_loss_coef
self.entropy_coef = entropy_coef
self.max_grad_norm = max_grad_norm
self.use_clipped_value_loss = use_clipped_value_loss
self.norm_adv = norm_adv
self.continuous_action = continuous_action
self.model = model
clip = nn.ClipGradByNorm(self.max_grad_norm)
self.optimizer = optim.Adam(
parameters=self.model.parameters(),
learning_rate=initial_lr,
epsilon=eps,
grad_clip=clip)
[docs] def learn(self,
batch_obs,
batch_action,
batch_value,
batch_return,
batch_logprob,
batch_adv,
lr=None):
""" update model with PPO algorithm
Args:
batch_obs (torch.Tensor): shape([batch_size] + obs_shape)
batch_action (torch.Tensor): shape([batch_size] + action_shape)
batch_value (torch.Tensor): shape([batch_size])
batch_return (torch.Tensor): shape([batch_size])
batch_logprob (torch.Tensor): shape([batch_size])
batch_adv (torch.Tensor): shape([batch_size])
lr (torch.Tensor):
Returns:
value_loss (float): value loss
action_loss (float): policy loss
entropy_loss (float): entropy loss
"""
values = self.model.value(batch_obs)
if self.continuous_action:
mean, std = self.model.policy(batch_obs)
dist = Normal(mean, std)
action_log_probs = dist.log_prob(batch_action).sum(1)
dist_entropy = dist.entropy().sum(1)
else:
logits = self.model.policy(batch_obs)
dist = Categorical(logits=logits)
act_dim = logits.shape[-1]
batch_action = paddle.to_tensor(batch_action, dtype='int64')
actions_onehot = F.one_hot(batch_action, act_dim)
action_log_probs = paddle.sum(
F.log_softmax(logits) * actions_onehot, axis=-1)
dist_entropy = dist.entropy()
entropy_loss = dist_entropy.mean()
batch_adv = batch_adv
if self.norm_adv:
batch_adv = (batch_adv - batch_adv.mean()) / (
batch_adv.std() + 1e-8)
ratio = paddle.exp(action_log_probs - batch_logprob)
surr1 = ratio * batch_adv
surr2 = paddle.clip(ratio, 1.0 - self.clip_param,
1.0 + self.clip_param) * batch_adv
action_loss = -paddle.minimum(surr1, surr2).mean()
values = values.reshape([-1])
# calculate value loss using semi gradient TD
if self.use_clipped_value_loss:
value_pred_clipped = batch_value + paddle.clip(
values - batch_value, -self.clip_param, self.clip_param)
value_losses = (values - batch_return).pow(2)
value_losses_clipped = (value_pred_clipped - batch_return).pow(2)
value_loss = 0.5 * paddle.maximum(value_losses,
value_losses_clipped).mean()
else:
value_loss = 0.5 * (values - batch_return).pow(2).mean()
loss = value_loss * self.value_loss_coef + action_loss - entropy_loss * self.entropy_coef
if lr:
self.optimizer.set_lr(lr)
loss.backward()
self.optimizer.step()
self.optimizer.clear_grad()
return value_loss.item(), action_loss.item(), entropy_loss.item()
[docs] def sample(self, obs):
""" Define the sampling process. This function returns the action according to action distribution.
Args:
obs (torch tensor): observation, shape([batch_size] + obs_shape)
Returns:
value (torch tensor): value, shape([batch_size, 1])
action (torch tensor): action, shape([batch_size] + action_shape)
action_log_probs (torch tensor): action log probs, shape([batch_size])
action_entropy (torch tensor): action entropy, shape([batch_size])
"""
value = self.model.value(obs)
if self.continuous_action:
mean, std = self.model.policy(obs)
dist = Normal(mean, std)
action = dist.sample([1])
action_log_probs = dist.log_prob(action).sum(-1)
action_entropy = dist.entropy().sum(-1).mean()
else:
logits = self.model.policy(obs)
dist = Categorical(logits=logits)
action = dist.sample([1])
act_dim = logits.shape[-1]
actions_onehot = F.one_hot(action, act_dim)
action_log_probs = paddle.sum(
F.log_softmax(logits) * actions_onehot, axis=-1)
action_entropy = dist.entropy()
return value, action, action_log_probs, action_entropy
[docs] def predict(self, obs):
""" use the model to predict action
Args:
obs (torch tensor): observation, shape([batch_size] + obs_shape)
Returns:
action (torch tensor): action, shape([batch_size] + action_shape),
noted that in the discrete case we take the argmax along the last axis as action
"""
if self.continuous_action:
action, _ = self.model.policy(obs)
else:
logits = self.model.policy(obs)
probs = F.softmax(logits)
action = paddle.argmax(probs, 1)
return action
[docs] def value(self, obs):
""" use the model to predict obs values
Args:
obs (torch tensor): observation, shape([batch_size] + obs_shape)
Returns:
value (torch tensor): value of obs, shape([batch_size])
"""
return self.model.value(obs)