Policy Gradient综述:
Policy Gradient,通过学习当前环境,直接给出要输出的动作的概率值。
Policy Gradient 不是单步更新,只能等玩完一个epoch,再更新参数,采取动作与动作评价是同一个函数,所以是一个on-policy
Policy Gradient 需要计算每一个state的期望reward,这个期望reward通过整个epoch的reward_list计算。所以只能等玩完1个epoch才能更新。
数学推导
最大化R,,用梯度下降,需要求R的梯度。
vt的计算
Policy Gradient 不是单步更新,只能等玩完一个epoch,得到每个epoch的observation_list \ action\_list reward_list
学习的时候,根据这三个list更新参数,其中下图公式中的vt 根据reward_list算出来。
vt的计算
Policy Gradient 不是单步更新,只能等玩完一个epoch,得到每个epoch的observation_list \ action\_list reward_list
学习的时候,根据这三个list更新参数,其中下图公式中的vt 根据reward_list算出来。
实现方式
神经网络分类模型,但是在算loss 的时候,logloss需要乘一个系数vt,这个系数与奖励Reward相关,如果采用当前动作,
在接下来的游戏中获得的Reward越大,那么在更新梯度的时候加大当前梯度下降的速度。
算法步骤
vt的计算
Policy Gradient 不是单步更新,只能等玩完一个epoch,得到每个epoch的observation_list \ action\_list reward_list
学习的时候,根据这三个list更新参数,其中下图公式中的vt 根据reward_list算出来。
代码
"""
This part of code is the reinforcement learning brain, which is a brain of the agent.
All decisions are made in here. Policy Gradient, Reinforcement Learning. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ Using:
Tensorflow: 1.0
gym: 0.8.0
""" import numpy as np
import tensorflow as tf # reproducible
np.random.seed(1)
tf.set_random_seed(1) class PolicyGradient:
def __init__(
self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.95,
output_graph=False,
):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay #每个epoch的observation \ action\ reward
self.ep_obs, self.ep_as, self.ep_rs = [], [], [] self._build_net() self.sess = tf.Session() if output_graph:
# $ tensorboard --logdir=logs
# http://0.0.0.0:6006/
# tf.train.SummaryWriter soon be deprecated, use following
tf.summary.FileWriter("logs/", self.sess.graph) self.sess.run(tf.global_variables_initializer()) def _build_net(self):
with tf.name_scope('inputs'):
self.tf_obs = tf.placeholder(tf.float32, [None, self.n_features], name="observations")
self.tf_acts = tf.placeholder(tf.int32, [None, ], name="actions_num")
self.tf_vt = tf.placeholder(tf.float32, [None, ], name="actions_value")
# fc1
layer = tf.layers.dense(
inputs=self.tf_obs,
units=10,
activation=tf.nn.tanh, # tanh activation
kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc1'
)
# fc2
all_act = tf.layers.dense(
inputs=layer,
units=self.n_actions,
activation=None,
kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc2'
) self.all_act_prob = tf.nn.softmax(all_act, name='act_prob') # use softmax to convert to probability with tf.name_scope('loss'):
# to maximize total reward (log_p * R) is to minimize -(log_p * R), and the tf only have minimize(loss)
neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_act, labels=self.tf_acts) # this is negative log of chosen action
# or in this way:
# neg_log_prob = tf.reduce_sum(-tf.log(self.all_act_prob)*tf.one_hot(self.tf_acts, self.n_actions), axis=1)
loss = tf.reduce_mean(neg_log_prob * self.tf_vt) # reward guided loss with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(self.lr).minimize(loss) def choose_action(self, observation):
prob_weights = self.sess.run(self.all_act_prob, feed_dict={self.tf_obs: observation[np.newaxis, :]})
action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel()) # select action w.r.t the actions prob
return action def store_transition(self, s, a, r):
self.ep_obs.append(s)
self.ep_as.append(a)
self.ep_rs.append(r) def learn(self):
# discount and normalize episode reward
discounted_ep_rs_norm = self._discount_and_norm_rewards() # train on episode
self.sess.run(self.train_op, feed_dict={
self.tf_obs: np.vstack(self.ep_obs), # shape=[None, n_obs]
self.tf_acts: np.array(self.ep_as), # shape=[None, ]
self.tf_vt: discounted_ep_rs_norm, # shape=[None, ]
}) self.ep_obs, self.ep_as, self.ep_rs = [], [], [] # empty episode data
return discounted_ep_rs_norm def _discount_and_norm_rewards(self):
# discount episode rewards
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
for t in reversed(range(0, len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t] = running_add # normalize episode rewards
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs