【强化学习】PPO代码注释版本
生活随笔
收集整理的這篇文章主要介紹了
【强化学习】PPO代码注释版本
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
# PPO主要通過限制新舊策略的比率,那些遠離舊策略的改變不會發(fā)生# import tensorflow as tf
import tensorflow.compat.v1 as tf
tf.compat.v1.disable_eager_execution()
import numpy as np
import matplotlib.pyplot as plt
import gym# 定義一些超級參量
EP_MAX = 1000 # 最大步數(shù)
EP_LEN = 200
GAMMA = 0.9 # 折扣因子
A_LR = 0.0001 # A網(wǎng)絡(luò)的學習速率
C_LR = 0.0002 # c網(wǎng)絡(luò)的學學習速率
BATCH = 32 # 緩沖池長度
A_UPDATE_STEPS = 10 #
C_UPDATE_STEPS = 10
S_DIM, A_DIM = 3, 1 # 狀態(tài)維度和動作維度
# 作者一共提出了兩種方法,一種是Adaptive KL Penalty Coefficient, 另一種是Clipped Surrogate Objective,結(jié)果證明,clip的這個方法更好
METHOD = [dict(name='kl_pen', kl_target=0.01, lam=0.5), # KL懲罰方法dict(name='clip', epsilon=0.2), # clip方法,發(fā)現(xiàn)這個更好
][1] # 選擇優(yōu)化的方法class PPO(object):def __init__(self):self.sess = tf.Session()# 狀態(tài)變量的結(jié)構(gòu),多少個不知道,但是有S_DIM這么多維self.tfs = tf.placeholder(tf.float32, [None, S_DIM], 'state')# critic# https://blog.csdn.net/yangfengling1023/article/details/81774580/# 全連接層,曾加了一個層,全連接層執(zhí)行操作 outputs = activation(inputs.kernel+bias) 如果執(zhí)行結(jié)果不想進行激活操作,則設(shè)置activation=None# self.s:輸入該網(wǎng)絡(luò)層的數(shù)據(jù) 100:輸出的維度大小,改變inputs的最后一維 tf.nn.relu6:激活函數(shù),即神經(jīng)網(wǎng)絡(luò)的非線性變化with tf.variable_scope('critic'): # tf.variable_scope:變量作用域l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu)self.v = tf.layers.dense(l1, 1) # 輸出一個float32的數(shù)self.tfdc_r = tf.placeholder(tf.float32, [None, 1], 'discounted_r') # discounted rewardself.advantage = self.tfdc_r - self.v # 相當于td_error# c網(wǎng)絡(luò)的loss也就是最小化advantage,先平方再取平均self.closs = tf.reduce_mean(tf.square(self.advantage))# critic 網(wǎng)絡(luò)的優(yōu)化器self.ctrain_op = tf.train.AdadeltaOptimizer(C_LR).minimize(self.closs)# actor# 建立了兩個actor網(wǎng)絡(luò)# actor有兩個actor 和 actor_old, actor_old的主要功能是記錄行為策略的版本。# 輸入時state,輸出是描述動作分布的mu和sigmapi, pi_params = self._build_anet('pi', trainable=True)oldpi, oldpi_params = self._build_anet('oldpi', trainable=False)with tf.variable_scope('sample_action'):# tf.squeeze函數(shù)返回一個張量,這個張量是將原始input中所有維度為1的那些維都刪掉的結(jié)果# axis可以用來指定要刪掉的為1的維度,此處要注意指定的維度必須確保其是1,否則會報錯# 這里pi.sample是指從pi中取了樣,會根據(jù)這個樣本選擇動作,網(wǎng)絡(luò)優(yōu)化之后會選擇更好的樣本出來self.sample_op = tf.squeeze(pi.sample(1), axis=0) # 這里應(yīng)該是只刪掉了0維的1 TODO 不是很明白# print("pi.sample(1)", pi.sample(1))# print("self.sample_op", self.sample_op)with tf.variable_scope('update_oldpi'):# .assign的意思是增加新的一列self.update_oldpi_op = [oldp.assign(p) for p, oldp in zip(pi_params, oldpi_params)]# 動作占位, td_error占位self.tfa = tf.placeholder(tf.float32, [None, A_DIM], 'action')self.tfadv = tf.placeholder(tf.float32, [None, 1], 'advantage')with tf.variable_scope('loss'): # 下面這些應(yīng)該是在實現(xiàn)loss函數(shù)with tf.variable_scope('surrogate'):# surrogate目標函數(shù):ratio = pi.prob(self.tfa) / (oldpi.prob(self.tfa) + 1e-5)surr = ratio * self.tfadv# 如果選擇了KL懲罰方法,這種方法稍微復雜if METHOD['name'] == 'kl_pen':self.tflam = tf.placeholder(tf.float32, None, 'lambda')# tf.distributions.kl_divergence KL散度,也就是兩個分布的相對熵,體現(xiàn)的是兩個分布的相似程度,熵越小越相似kl = tf.distributions.kl_divergence(oldpi, pi)self.kl_mean = tf.reduce_mean(kl)self.aloss = -(tf.reduce_mean(surr - self.tflam * kl))# 如果選擇的是clip方法,這個比較簡單else: # clipping method, find this is betterself.aloss = -tf.reduce_mean(tf.minimum(surr,tf.clip_by_value(ratio, 1. - METHOD['epsilon'], 1. + METHOD['epsilon']) * self.tfadv))# a網(wǎng)絡(luò)優(yōu)化器with tf.variable_scope('atrain'):self.atrain_op = tf.train.AdamOptimizer(A_LR).minimize(self.aloss)# 指定文件來保存圖,tensorboard的步驟tf.summary.FileWriter("log/", self.sess.graph)# 運行會話self.sess.run(tf.global_variables_initializer())def update(self, s, a, r): # 更新函數(shù)# print("fuction: update")# 先運行倆會話self.sess.run(self.update_oldpi_op)adv = self.sess.run(self.advantage, {self.tfs: s, self.tfdc_r: r})# update actorif METHOD['name'] == 'kl_pen': # TODO 不懂for _ in range(A_UPDATE_STEPS):_, kl = self.sess.run([self.atrain_op, self.kl_mean],{self.tfs: s, self.tfa: a, self.tfadv: adv, self.tflam: METHOD['lam']})if kl > 4 * METHOD['kl_target']: # this in in google's paperbreak# 這一塊是在計算懲罰項的期望,并對系數(shù)進行自適應(yīng)調(diào)整# 更新后的系數(shù)用于下一次策略更新。在這個方案中,我們偶爾會看到KL差異與target顯著不同,但是這種情況很少,# 因為B會迅速調(diào)整。參數(shù)1.5和2是啟發(fā)式選擇的,但算法對它們不是很敏感。B的初值是另一個超參數(shù),但在實際應(yīng)用中# 并不重要,因為算法可以快速調(diào)整它。if kl < METHOD['kl_target'] / 1.5: # adaptive lambda, this is in OpenAI's paperMETHOD['lam'] /= 2elif kl > METHOD['kl_target'] * 1.5:METHOD['lam'] *= 2METHOD['lam'] = np.clip(METHOD['lam'], 1e-4, 10) # sometimes explode, this clipping is my solutionelse: # clipping method, find this is better (OpenAI's paper)[self.sess.run(self.atrain_op, {self.tfs: s, self.tfa: a, self.tfadv: adv}) for _ in range(A_UPDATE_STEPS)]# update critic[self.sess.run(self.ctrain_op, {self.tfs: s, self.tfdc_r: r}) for _ in range(C_UPDATE_STEPS)]def choose_action(self, s):# print("fuction: choose_action")# print('state: ', s, "s.shape", s.shape)s = s[np.newaxis, :]# print('s[np.newaxis, :]: ', s, s.shape)a = self.sess.run(self.sample_op, {self.tfs: s})[0]print("action",a)print("np.clip(a, -2, 2)", np.clip(a, -2, 2))return np.clip(a, -2, 2)# np.clip: 是一個截取函數(shù),用于截取數(shù)組中小于或者大于某值的部分,并使得被截取部分等于固定值。所有比a_min小的數(shù)都會強制變?yōu)閍_min;def get_v(self, s):# print("fuction: get_v")if s.ndim < 2: s = s[np.newaxis, :]return self.sess.run(self.v, {self.tfs: s})[0, 0]def _build_anet(self, name, trainable):# print("fuction: _build_anet")with tf.variable_scope(name):# tf.nn.relu 激活函數(shù)#l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu, trainable=trainable)# tf.nn.tanh 計算l1的雙曲正切值 會把值壓縮到(-1,1)之間 # 平均值mu = 2 * tf.layers.dense(l1, A_DIM, tf.nn.tanh, trainable=trainable)# tf.nn.softplus 這個函數(shù)的作用是計算激活函數(shù)softplus,即log( exp(l1) + 1)。sigma = tf.layers.dense(l1, A_DIM, tf.nn.softplus, trainable=trainable) # 標準差# 該函數(shù)定義了一個正態(tài)分布。 mu是平均值 sigma是標準差norm_dist = tf.distributions.Normal(loc=mu, scale=sigma)print("norm_dist", norm_dist)params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=name)print("I am params", params)return norm_dist, paramsdef main():env = gym.make('Pendulum-v0').unwrappedppo = PPO()all_ep_r = []for ep in range(EP_MAX):s = env.reset() # 狀態(tài)初始化buffer_s, buffer_a, buffer_r = [], [], [] # 緩存區(qū)ep_r = 0 # 初始化回合for t in range(EP_LEN): # 在規(guī)定的回合長度內(nèi)env.render() # 環(huán)境渲染a = ppo.choose_action(s)s_, r, done, _ = env.step(a) # 執(zhí)行動作獲取需要的參量buffer_s.append(s) # 把這些參量加到緩存區(qū)buffer_a.append(a)buffer_r.append((r + 8) / 8) # 規(guī)范獎勵,發(fā)現(xiàn)有用的東西s = s_ep_r += r# 更新PPO# 如果buffer收集一個batch或者episode完了if (t + 1) % BATCH == 0 or t == EP_LEN - 1: # TODO 這里具體再解釋一下v_s_ = ppo.get_v(s_) # 計算 discounted rewarddiscounted_r = []for r in buffer_r[::-1]: # print(a[::-1]) ### 取從后向前(相反)的元素[1 2 3 4 5]-->[ 5 4 3 2 1 ]v_s_ = r + GAMMA * v_s_ # 狀態(tài)價值計算discounted_r.append(v_s_)discounted_r.reverse() # 先反方向加入再逆轉(zhuǎn)# 清空 bufferbs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.array(discounted_r)[:, np.newaxis]buffer_s, buffer_a, buffer_r = [], [], []ppo.update(bs, ba, br) # 更新PPO TODO 這些定義具體是干什么用的呢?if ep == 0:all_ep_r.append(ep_r)else:all_ep_r.append(all_ep_r[-1] * 0.9 + ep_r * 0.1)print('Ep: %i' % ep,"|Ep_r: %i" % ep_r,("|Lam: %.4f" % METHOD['lam']) if METHOD['name'] == 'kl_pen' else '',)plt.plot(np.arange(len(all_ep_r)), all_ep_r)plt.xlabel('Episode') # 回合數(shù)plt.ylabel('Moving averaged episode reward') # 平均回報plt.show()if __name__ == '__main__':main()
總結(jié)
以上是生活随笔為你收集整理的【强化学习】PPO代码注释版本的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 新买的路由器怎么接如何购买新的路由器
- 下一篇: 【强化学习】可视化学习tensorboa