【深度强化学习】(6) PPO 模型解析,附Pytorch完整代码

大家好,今天和各位分享一下深度强化学习中的近端策略优化算法(proximal policy optimization,PPO),并借助 OpenAI 的 gym 环境完成一个小案例,完整代码可以从我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 算法原理

PPO 算法之所以被提出,根本原因在于 Policy Gradient 在处理连续动作空间时 Learning rate 取值抉择困难。Learning rate 取值过小,就会导致深度强化学习收敛性较差,陷入完不成训练的局面,取值过大则导致新旧策略迭代时数据不一致,造成学习波动较大或局部震荡。除此之外,Policy Gradient 因为在线学习的性质,进行迭代策略时原先的采样数据无法被重复利用,每次迭代都需要重新采样

同样地置信域策略梯度算法(Trust Region Policy Optimization,TRPO)虽然利用重要性采样(Important-sampling)、共轭梯度法求解提升了样本效率、训练速率等,但在处理函数的二阶近似时会面临计算量过大,以及实现过程复杂、兼容性差等缺陷。 

PPO 算法具备 Policy Gradient、TRPO 的部分优点采样数据和使用随机梯度上升方法优化代替目标函数之间交替进行,虽然标准的策略梯度方法对每个数据样本执行一次梯度更新,但 PPO 提出新目标函数,可以实现小批量更新。

鉴于上述问题,该算法在迭代更新时,观察当前策略在 t 时刻智能体处于状态 s 所采取的行为概率\pi (a_t |s_t),与之前策略所采取行为概率 \pi_{\theta old} (a_t | s_t)计算概率的比值来控制新策略更新幅度,比值 r_t 记作:

r_t(\theta) = \frac{\pi _{\theta}(a_t|s_t)}{\pi_{\theta old}(a_t|s_t)}

新旧策略差异明显且优势函数较大,则适当增加更新幅度;若 r_t 比值越接近 1,表明新旧策略差异越小。

优势函数代表,在状态 s 下,行为 a 相对于均值的偏差。在论文中,优势函数 \hat{A}_t 使用 GAE(generalized advantage estimation)来计算:

\hat{A}_t^{GAE(\gamma, \lambda)} = \sum_{l=0}^{\bowtie } (\gamma \lambda )^l \delta _{t+l} ^ V

\hat{A}_t^{GAE(\gamma, \lambda)} = \delta _t^V + (\gamma \lambda) \delta _{t+1}^V + (\gamma \lambda)^2 \delta _{t+2}^V + \cdots + (\gamma \lambda)^{T-t+1} \delta_{T+1}

\delta _t^V = r_t + \gamma V_\omega (s_{t+1}) - V_\omega (s_t)

PPO 算法可依据 Actor 网络的更新方式细化为含有自适应 KL-散度(KL Penalty)的 PPO-Penalty 含有 Clippped Surrogate Objective 函数的 PPO-Clip

(1)PPO-Penalty 基于KL 惩罚项优化目标函数,实验证明惩罚项系数 \beta 在迭代过程中并非固定值,需要动态调整惩罚权重,其目标函数 L 可以定义为:

L^{KLPEN} (\theta) = \hat{E} [\frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta old}(a_t|s_t)} \hat{A}_t - \beta KL [\pi_{\theta old}(\cdot |s_t), \pi_{\theta}(\cdot | s_t)]]

惩罚项 \beta 的初始值的选择对算法几乎无影响,原因是它能在每次迭代时依据新旧策略的 KL 散度做适宜调整,首先设置 KL 散度阈值 d_{targ},再通过下面的表达式计算 d

d = \hat{E} [KL[\pi_{\theta old}(\cdot |s_t), \pi_\theta (\cdot | s_t)]]

如果 d<d_{targ} / 1.5 时,证明散度较小,需要弱化惩罚力度,\beta 调整为 \beta /2

如果 d>d_{targ} \times 1.5 时,证明散度较大,需要增强惩罚力度,\beta 调整为 \beta \times 2

(2)PPO-Clip 直接对新旧策略比例进行一定程度的 Clip 操作,以约束变化幅度。其目标函数的计算方式如下:

L^{CLIP} (\theta ) = \hat{E}_t [min(r_t(\theta)\hat{A}_t, clip(r_t(\theta), 1-\varepsilon ,1+\varepsilon )\hat{A}_t)]

其中,\varepsilon 代表截断超参数,一般设定值为 0.2clip() 表示截断函数负责限制比例 r_t 在[1-\varepsilon ,1+\varepsilon ] 区间之内,以保证收敛性;最终 L^{CLIP} 借助 min 函数选取未截断与截断目标之间的更小值,形成目标下限。L^{CLIP} 可以分为优势函数 A 为正数和负数两种情况,其变化趋势如下图所示:

如果优势函数为正数,需要增大新旧策略比值 r_t,然而当 r_t>1+\varepsilon 时,将不提供额外的激励;如果优势函数是负数,需要减少新旧策略比值 r_t,但在 r_t < 1+\varepsilon 时,不提供额外的激励,这使得新旧策略的差异被限制在合理范围内。PPO 本质上基于 Actor-Critic 框架,算法流程如下:

PPO 算法主要由 Actor 和 Critic 两部分构成,Critic 部分更新方式与其他Actor-Critic 类型相似,通常采用计算 TD  error(时序差分误差)形式。对于 Actor 的更新方式,PPO 可在KLPENL 、CLIPL 之间选择对于当前实验环境稳定性适用性更强的目标函数,经过 OpenAI 研究团队实验论证,PPO- Clip 比 PPO- Penalty有更好的数据效率和可行性 


2. 代码实现

下面我就采用 Clip 形式的 PPO。模型构建代码如下。下面的模型适用于 action 是离散的情况,连续情况的代码可以从我的 GitHub 中获取。

# 代码用于离散环境的模型
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F

# ----------------------------------- #
# 构建策略网络--actor
# ----------------------------------- #

class PolicyNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_actions)
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # [b, n_actions]
        x = F.softmax(x, dim=1)  # [b, n_actions]  计算每个动作的概率
        return x

# ----------------------------------- #
# 构建价值网络--critic
# ----------------------------------- #

class ValueNet(nn.Module):
    def __init__(self, n_states, n_hiddens):
        super(ValueNet, self).__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, 1)
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # [b,n_hiddens]-->[b,1]  评价当前的状态价值state_value
        return x

# ----------------------------------- #
# 构建模型
# ----------------------------------- #

class PPO:
    def __init__(self, n_states, n_hiddens, n_actions,
                 actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):
        # 实例化策略网络
        self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device)
        # 实例化价值网络
        self.critic = ValueNet(n_states, n_hiddens).to(device)
        # 策略网络的优化器
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        # 价值网络的优化器
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr = critic_lr)

        self.gamma = gamma  # 折扣因子
        self.lmbda = lmbda  # GAE优势函数的缩放系数
        self.epochs = epochs  # 一条序列的数据用来训练轮数
        self.eps = eps  # PPO中截断范围的参数
        self.device = device

    # 动作选择
    def take_action(self, state):
        # 维度变换 [n_state]-->tensor[1,n_states]
        state = torch.tensor(state[np.newaxis, :]).to(self.device)
        # 当前状态下,每个动作的概率分布 [1,n_states]
        probs = self.actor(state)
        # 创建以probs为标准的概率分布
        action_list = torch.distributions.Categorical(probs)
        # 依据其概率随机挑选一个动作
        action = action_list.sample().item()
        return action

    # 训练
    def learn(self, transition_dict):
        # 提取数据集
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).to(self.device).view(-1,1)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device).view(-1,1)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).to(self.device).view(-1,1)

        # 目标,下一个状态的state_value  [b,1]
        next_q_target = self.critic(next_states)
        # 目标,当前状态的state_value  [b,1]
        td_target = rewards + self.gamma * next_q_target * (1-dones)
        # 预测,当前状态的state_value  [b,1]
        td_value = self.critic(states)
        # 目标值和预测值state_value之差  [b,1]
        td_delta = td_target - td_value

        # 时序差分值 tensor-->numpy  [b,1]
        td_delta = td_delta.cpu().detach().numpy()
        advantage = 0  # 优势函数初始化
        advantage_list = []

        # 计算优势函数
        for delta in td_delta[::-1]:  # 逆序时序差分值 axis=1轴上倒着取 [], [], []
            # 优势函数GAE的公式
            advantage = self.gamma * self.lmbda * advantage + delta
            advantage_list.append(advantage)
        # 正序
        advantage_list.reverse()
        # numpy --> tensor [b,1]
        advantage = torch.tensor(advantage_list, dtype=torch.float).to(self.device)

        # 策略网络给出每个动作的概率,根据action得到当前时刻下该动作的概率
        old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()

        # 一组数据训练 epochs 轮
        for _ in range(self.epochs):
            # 每一轮更新一次策略网络预测的状态
            log_probs = torch.log(self.actor(states).gather(1, actions))
            # 新旧策略之间的比例
            ratio = torch.exp(log_probs - old_log_probs)
            # 近端策略优化裁剪目标函数公式的左侧项
            surr1 = ratio * advantage
            # 公式的右侧项,ratio小于1-eps就输出1-eps,大于1+eps就输出1+eps
            surr2 = torch.clamp(ratio, 1-self.eps, 1+self.eps) * advantage

            # 策略网络的损失函数
            actor_loss = torch.mean(-torch.min(surr1, surr2))
            # 价值网络的损失函数,当前时刻的state_value - 下一时刻的state_value
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))

            # 梯度清0
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            # 反向传播
            actor_loss.backward()
            critic_loss.backward()
            # 梯度更新
            self.actor_optimizer.step()
            self.critic_optimizer.step()

3. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,一个离散的环境,目标是左右移动小车将黄色的杆子保持竖直。动作维度为2,属于离散值;状态维度为 4,分别是坐标、速度、角度、角速度。

import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
from RL_brain import PPO

device = torch.device('cuda') if torch.cuda.is_available() \
                            else torch.device('cpu')

# ----------------------------------------- #
# 参数设置
# ----------------------------------------- #

num_episodes = 100  # 总迭代次数
gamma = 0.9  # 折扣因子
actor_lr = 1e-3  # 策略网络的学习率
critic_lr = 1e-2  # 价值网络的学习率
n_hiddens = 16  # 隐含层神经元个数
env_name = 'CartPole-v1'
return_list = []  # 保存每个回合的return

# ----------------------------------------- #
# 环境加载
# ----------------------------------------- #

env = gym.make(env_name, render_mode="human")
n_states = env.observation_space.shape[0]  # 状态数 4
n_actions = env.action_space.n  # 动作数 2

# ----------------------------------------- #
# 模型构建
# ----------------------------------------- #

agent = PPO(n_states=n_states,  # 状态数
            n_hiddens=n_hiddens,  # 隐含层数
            n_actions=n_actions,  # 动作数
            actor_lr=actor_lr,  # 策略网络学习率
            critic_lr=critic_lr,  # 价值网络学习率
            lmbda = 0.95,  # 优势函数的缩放因子
            epochs = 10,  # 一组序列训练的轮次
            eps = 0.2,  # PPO中截断范围的参数
            gamma=gamma,  # 折扣因子
            device = device
            )

# ----------------------------------------- #
# 训练--回合更新 on_policy
# ----------------------------------------- #

for i in range(num_episodes):
    
    state = env.reset()[0]  # 环境重置
    done = False  # 任务完成的标记
    episode_return = 0  # 累计每回合的reward

    # 构造数据集,保存每个回合的状态数据
    transition_dict = {
        'states': [],
        'actions': [],
        'next_states': [],
        'rewards': [],
        'dones': [],
    }

    while not done:
        action = agent.take_action(state)  # 动作选择
        next_state, reward, done, _, _  = env.step(action)  # 环境更新
        # 保存每个时刻的状态\动作\...
        transition_dict['states'].append(state)
        transition_dict['actions'].append(action)
        transition_dict['next_states'].append(next_state)
        transition_dict['rewards'].append(reward)
        transition_dict['dones'].append(done)
        # 更新状态
        state = next_state
        # 累计回合奖励
        episode_return += reward

    # 保存每个回合的return
    return_list.append(episode_return)
    # 模型训练
    agent.learn(transition_dict)

    # 打印回合信息
    print(f'iter:{i}, return:{np.mean(return_list[-10:])}')

# -------------------------------------- #
# 绘图
# -------------------------------------- #

plt.plot(return_list)
plt.title('return')
plt.show()

训练100回合,绘制每回合的 return

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/3816.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

泰克信号发生器特点

泰克信号发生器是一种用于产生各种类型的电子信号的仪器&#xff0c;可以广泛应用于电子、通信、自动化、医疗等领域。泰克信号发生器具有以下特点&#xff1a;多种信号类型&#xff1a;泰克信号发生器可以产生多种类型的电子信号&#xff0c;包括正弦波、方波、三角波、脉冲等…

TitanIDE:云原生开发到底强在哪里?

原文作者&#xff1a;行云创新技术总监 邓冰寒 引言 是一种新的软件开发方法&#xff0c;旨在构建更可靠、高效、弹性、安全和可扩展的应用程序。与传统的应用程序开发方式不同&#xff0c;云原生是将开发环境完全搬到云端&#xff0c;构建一站式的云原生开发环境。云原生的开…

PWM互补输出,以及死区时间计算

本文基于野火例程进行解说 实验内容 本次实验输出一对互补的pwm波&#xff0c;且进行死区时间的计算说明。 代码 互补输出对应的定时器初始化代码&#xff1a; bsp_advance_tim.c /********************************************************************************* fi…

【YOLO】YOLOv8训练自定义数据集(4种方式)

YOLOv8 出来一段时间了&#xff0c;继承了分类、检测、分割&#xff0c;本文主要实现自定义的数据集&#xff0c;使用 YOLOV8 进行检测模型的训练和使用 YOLOv8 此次将所有的配置参数全部解耦到配置文件 default.yaml&#xff0c;不再类似于 YOLOv5&#xff0c;一部分在配置文件…

Anaconda 的安装配置及依赖项的内外网配置

在分享anaconda 的安装配置及使用前&#xff0c;我们必须先明白anaconda是什么&#xff1b;Anaconda是一个开源的Python发行版本。两者区别在于前者是一门编程语言&#xff0c;后者相当于编程语言中的工具包。 由于python自身缺少numpy、matplotlib、scipy、scikit-learn等一系…

Java中的深拷贝和浅拷贝

目录 &#x1f34e;引出拷贝 &#x1f34e;浅拷贝 &#x1f34e;深拷贝 &#x1f34e;总结 引出拷贝 现在有一个学生类和书包类&#xff0c;在学生类中有引用类型的书包变量&#xff1a; class SchoolBag {private String brand; //书包的品牌private int size; //书…

7.网络爬虫—正则表达式详讲

7.网络爬虫—正则表达式详讲与实战Python 正则表达式re.match() 函数re.search方法re.match与re.search的区别re.compile 函数检索和替换检索&#xff1a;替换&#xff1a;findallre.finditerre.split正则表达式模式常见的字符类正则模式正则表达式模式量词正则表达式举例前言&…

2022财报逆转,有赞穿透迷雾实现突破

2022年&#xff0c;商家经营面临困难。但在一些第三方服务商的帮助下&#xff0c;也有商家取得了逆势增长。 2023年3月23日&#xff0c;有赞发布2022年业绩报告&#xff0c;它帮助许多商家稳住了一整年的经营。2022年&#xff0c;有赞门店SaaS业务的GMV达到425亿元&#xff0c…

24万字智慧城市顶层设计及智慧应用解决方案

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除。部分资料内容&#xff1a; 4.8 机房消防系统 4.8.1消防系统概况 根据本工程机房消防系统的特殊要求&#xff0c;整个消防系统由火灾报警系统、消防联动系统和气体灭火系统三部…

常见的嵌入式微处理器(Micro Processor Unit,MPU)

嵌入式微处理器是由通用计算机中的CPU演变而来的。它的特征是具有32位以上的处理器&#xff0c;具有较高的性能&#xff0c;当然其价格也相应较高。但与计算机处理器不同的是&#xff0c;在实际嵌入式应用中&#xff0c;只保留和嵌入式应用紧密相关的功能硬件&#xff0c;去除了…

医院陪诊系统源码,可以提供新的就医方式

随着人们生活水平的提高和医疗服务的进步&#xff0c;越来越多的人们开始注重家庭健康和医疗保健。在这个背景下&#xff0c;陪护系统和医院陪诊系统应运而生&#xff0c;成为了现代医疗服务领域中的重要组成部分。 陪护系统是一种为患者提供家庭养护服务的机构&#xff0c;它…

“蓝桥杯”递推和递归(一)——取数位

1. 算法简介 递推和递归虽然叫法不同&#xff0c;但它们的基本思想是一致的&#xff0c;在很多程序中&#xff0c;这两种算法可以通用&#xff0c;不同的是递推法效率更高&#xff0c;递归法更方便阅读。 &#xff08;1&#xff09;递推法 递推法是一种重要的数学方法&#…

【PC自动化测试-4】inspect.exe 详解

1&#xff0c;inspect.exe图解" 检查 "窗口有几个主要部分&#xff1a;● 标题栏。 显示" 检查 HWND (窗口句柄) 。● 菜单栏。 提供对 检查功能 的访问权限。● 工具 栏。 提供对 检查功能 的访问权限。● 树视图。 将 UI 元素的层次结构呈现为树视图控件&…

【超好懂的比赛题解】暨南大学2023东软教育杯ACM校赛个人题解

title : 暨南大学2023东软教育杯ACM校赛 题解 tags : ACM,练习记录 date : 2023-3-26 author : Linno 文章目录暨南大学2023东软教育杯ACM校赛 题解A-小王的魔法B-苏神的遗憾C-神父的碟D-基站建设E-小王的数字F-Uziの真身G-电子围棋H-二分大法I-丁真的小马朋友们J-单车运营K-超…

JavaScript实现列表分页(小白版)

组件用惯了&#xff0c;突然叫你用纯cssJavaScript写一个分页&#xff0c;顿时就慌了。久久没有接触js了&#xff0c;不知道咋写了。本文章也是借与参考做的一个demo案例&#xff0c;小白看了都会的那种。咱们就以ul列表为例进行分页&#xff1a; 首先模拟的数据列表是这样的&a…

变量的理论分布模型

二项分布 定义 对立事件的总体分布&#xff0c;称为二项分布。 例如&#xff0c;一个群体只有男和女&#xff0c;现在进行n次随机抽样调查&#xff0c;随机抽样男出现的次数可能是0&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;4&#xff0c;…&#xff0c;n, 这种类…

网络安全实战从 0 到 1 彻底掌握 XXE

0x01 什么是 XXE个人认为&#xff0c;XXE 可以归结为一句话&#xff1a;构造恶意 DTD介绍 XXE 之前&#xff0c;我先来说一下普通的 XML 注入&#xff0c;这个的利用面比较狭窄&#xff0c;如果有的话应该也是逻辑漏洞。既然能插入 XML 代码&#xff0c;那我们肯定不能善罢甘休…

ROS Cartographer--Algorithm

ROS Cartographer–Algorithm 原文&#xff1a;Algorithm walkthrough for tuning 论文地址(Google Search)&#xff1a;Real-Time Loop Closure in 2D LIDAR SLAM ROS Cartographer的完整参考文件&#xff1a;Cartographer ROS Integration 概述 本地SLAM通常由前端和后端…

Python满屏表白代码

目录 前言 爱心界面 无限弹窗 前言 人生苦短&#xff0c;我用Python&#xff01;又是新的一周啦&#xff0c;本期博主给大家带来了一个全新的作品&#xff1a;满屏表白代码&#xff0c;无限弹窗版&#xff01;快快收藏起来送给她吧~ 爱心界面 def Heart(): roottk.Tk…

【Linux】计算机网络1

计算机网络的背景背景&#xff1a;早在20世纪50年代初&#xff0c;美国建立的地面防空系统就是将地面的雷达和其他测量控制设备的信息通过通信线路汇集到一台中心计算机进行处理&#xff0c;开创了把计算机技术和通信技术相结合的尝试。20世纪60年代中期开始&#xff0c;出现、…
最新文章