6.1深度强化学习算法发展图
图最左边一栏中没有采用深度学习的基础强化学习方法。在迷宫任务中,我们实现了策略迭代法(REINFORCE)、Sarsa和Q学习。第4章介绍了深度学习。第5章实现了DQN,它用深度神经网络表达动作价值函数Q(s,a)。DDQN是双重Q学习和DQN的组合。Q学习和DQN学习动作价值函数Q(s,a),有必要使用一个动作价值函数Q来更新另一个动作价值函数Q,这是导致学习不稳定的一个因素。因此,DDQN使用两个网络来更新作为价值函数的Q函数。
Dueling Network是一种在动作价值函数输出层之前增加一层,用于输出状态价值V(s)和优势函数A(s,a) = Q(s,a)-V(s)的方法,学习状态价值V(s)时,能独立于动作而学习状态价值V(s),具有提高学习性能的优点。这里的V(s)不是所采用的具有最高动作价值的动作的Q值,而是所有动作的平均Q值。
优先经验回放(Prioritized Experience Replay)是一种对“经验回放”进行优化的技术,优先经验回放根据优先级提取transition,优先级的排序标准是网络输出与监督信息的差,具体来说
R
t
+
1
+
γ
max
a
Q
(
s
t
+
1
,
a
)
−
Q
(
s
t
,
a
t
)
R_{t+1}+\gamma\max\limits_a Q(s_{t+1},a)-Q(s_t,a_t)
Rt+1+γamaxQ(st+1,a)−Q(st,at)的绝对值,如果绝对值很大,
Q
(
s
t
,
a
t
)
Q(s_t,a_t)
Q(st,at)学习不到位,在replay时优先提取它。相反,如果绝对值小,说明学习情况好,则降低提取概率
DQN之后,A3C作为深度强化学习划时代算法引起了人们的关注。A3C时Asynchronous Advantage Actor-Critic的缩写。第一个A是异步,异步分布式学习系统。第二个A是优势。Q学习进行更新时,使用1步后的状态进行更新,这里使用两步或多步后的状态进行更新,这种学习方法称为Advantage学习。第三个A表示Actor-Critic,是策略迭代法和价值迭代法的结合,Actor是一个输出策略的函数,Critic是一个输出价值的函数。Actor-Critic同时使用这两个函数。
像A3C和A2C这样使用多个智能体的算法主要有两个优点。首先,它很容易将强化学习应用于现实世界中,当对现实世界中的机器人等而不是在PC的模拟环境中应用强化学习时,使用多个机器人来减少学习时间是很重要的。第二个优点是不必使用经验回放。在诸如DQN等只有一个智能体的算法中,连续的transition在内容上相似导致学习难以稳定。因此,DQN等使用经验回放来随机地在经验池中对transition进行采样。而使用多智能体的算法中,多个智能体各自创建自己的transition,从而消除了对经验回放的需求。因此,也可以使用RNN(递归神经网络)和LSTM(长短期记忆)等深度神经网络。
一种名为TRPO(Trust Region Policy Optimization,置信区域策略优化)的方法能使策略梯度法更稳定,TRPO也是基于Actor-Critic框架的,它经过改进得到了PPO(Proximal Policy Optimization,近端策略优化)。同时还有一种称为ACTKR(Actor Critic using Kronecker-Factored Trust Region,使用克罗内克系数的置信区域的ActorCritic)的方法,该方法比A2C的Actor-Critic能更有效地学习。
6.2DDQN的实现
2015nature版DQN的更新公式如下:
Q
m
(
s
t
,
a
t
)
=
Q
m
(
s
t
,
a
t
)
+
η
∗
(
R
t
+
1
+
γ
max
a
∗
Q
t
(
s
t
+
1
,
a
)
−
Q
m
(
s
t
,
a
t
)
)
Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma\max\limits_a *Q_t(s_{t+1},a)-Q_m(s_t,a_t))
Qm(st,at)=Qm(st,at)+η∗(Rt+1+γamax∗Qt(st+1,a)−Qm(st,at)) 其中,
Q
m
(
s
t
,
a
t
)
Q_m(s_t,a_t)
Qm(st,at)表示主
Q
Q
Q网络,
Q
t
(
s
t
+
1
,
a
)
Q_t(s_{t+1},a)
Qt(st+1,a)是目标
Q
Q
Q网络,
max
a
∗
Q
t
(
s
t
+
1
,
a
)
\max\limits_a *Q_t(s_{t+1},a)
amax∗Qt(st+1,a)从目标网络获得下一状态
s
t
+
1
s_{t+1}
st+1中具有最高
Q
Q
Q的动作
a
a
a和当前时刻的Q值。DDQN是一种使更新公式更稳定的方法,更新公式为:
a
m
=
arg
max
a
Q
(
s
t
+
1
,
a
)
a_m=\arg\max\limits_a Q(s_{t+1},a)
am=argamaxQ(st+1,a)
Q
m
(
s
t
,
a
t
)
=
Q
m
(
s
t
,
a
t
)
+
η
∗
(
R
t
+
1
+
γ
∗
Q
t
(
s
t
+
1
,
a
m
)
−
Q
m
(
s
t
,
a
t
)
)
Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma*Q_t(s_{t+1},a_m)-Q_m(s_t,a_t))
Qm(st,at)=Qm(st,at)+η∗(Rt+1+γ∗Qt(st+1,am)−Qm(st,at)) 也就是说从主
Q
Q
Q网络获得在下一状态
s
t
+
1
s_{t+1}
st+1中具有最高
Q
Q
Q的动作
a
m
a_m
am,并且从目标
Q
Q
Q网络获得该动作
a
m
a_m
am的
Q
Q
Q值。这称为双重DQN,因为它使用两个网络来确定主Q网络的更新量。
第5章为了优先理解实现流程,编写了代码较长的函数replay而未进行拆分。首先,重构第5章中DQN的程序以缩短Brain类的replay函数,将三个部分分别进行函数化。使用函数make_minibatch创建小批量数据,使用函数get_expected_state_action_values获取监督信息
Q
(
s
t
,
a
t
)
Q(s_t,a_t)
Q(st,at),使用函数update_main_q_network更新连接参数。然后可以使用以下代码编写函数。
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
import gym
import random
import numpy as np
# 动画显示函数
def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('./image/movie_cartpole_DQN.mp4')
plt.close() # 防止显示两个输出
return HTML(anim.to_jshtml())
from collections import namedtuple
#namedtuple就是有名字的元组,使得元组有键名,以便在DQN访问状态和动作值
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
#为了实现小批量学习,实现了内存类ReplayMemory来存储经验数据。ReplayMemory
class ReplayMemory:
'''push函数保存步骤中的transition,随机选择的sample函数'''
def __init__(self,CAPACITY) -> None:
self.capacity = CAPACITY
self.memory = []
self.index = 0
def push(self,state,action,state_next,reward):
'''将transition(state,action,state_next,reward)保存在存储器中'''
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state,action,state_next,reward)
self.index = (self.index + 1) % self.capacity #保存的index移动一位
def sample(self,batch_size):
return random.sample(self.memory,batch_size)
def __len__(self):
'''返回当前memory长度'''
return len(self.memory)
然后将重构的Brain类从DQN更新为DDQN。更改Brain类初始化函数init中的两处,这次我们将构建两个网络:变量main_q_network和变量target_q_network。让我们在另一个名为Net的类中准备构建神经网络。Net类如下所示。
from torch import nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self, n_in, n_mid,n_out):
super(Net,self).__init__()
self.fc1 = nn.Linear(n_in,n_mid)
self.fc2 = nn.Linear(n_mid,n_mid)
self.fc3 = nn.Linear(n_mid,n_out)
def forward(self,x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
output = self.fc3(h2)
return output
将Brain类修改为DDQN版本
# 这是一个类,充当代理的大脑,执行 DDQN
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
BATCH_SIZE = 32
CAPACITY = 10000
class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # 获取 CartPole 的两个动作(向右或向左推)
# 创建用于存储经验的内存对象
self.memory = ReplayMemory(CAPACITY)
# 构建神经网络
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类
self.target_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类
print(self.main_q_network) # 输出网络结构
# 设置优化方法
self.optimizer = optim.Adam(
self.main_q_network.parameters(), lr=0.0001)
def replay(self):
'''使用 Experience Replay 学习网络的连接参数'''
# 1. 检查内存大小
if len(self.memory) < BATCH_SIZE:
return
# 2. 创建小批量
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()
# 3. 求出作为教师信号的 Q(s_t, a_t) 值
self.expected_state_action_values = self.get_expected_state_action_values()
# 4. 更新连接参数
self.update_main_q_network()
def decide_action(self, state, episode):
'''根据当前状态决定动作'''
# 逐渐只采取最优动作的 ε-greedy 法
epsilon = 0.5 * (1 / (episode + 1))
if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # 将网络切换到推理模式
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# 提取网络输出的最大值的索引 = max(1)[1]
# .view(1,1) 将 [torch.LongTensor of size 1] 转换为 size 1x1
else:
# 随机返回 0 或 1 动作
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 随机返回 0 或 1 动作
# action 将是 [torch.LongTensor of size 1x1] 的形状
return action
def make_minibatch(self):
'''2. 创建小批量'''
# 2.1 从内存中取出小批量数据
transitions = self.memory.sample(BATCH_SIZE)
# 2.2 将各变量转换为小批量相对应的形状
# transitions 包含 BATCH_SIZE 个 (state, action, state_next, reward)
# 也就是说,有 (state, action, state_next, reward)×BATCH_SIZE
# 我们想要将其转换为小批量的形式
# 也就是 (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)
batch = Transition(*zip(*transitions))
# 2.3 将每个变量的元素转换为小批量对应的形状,并转换为 Variable 以便网络处理
# 例如,对于 state,将 BATCH_SIZE 个 [torch.FloatTensor of size 1x4] 转换为 [torch.FloatTensor of size BATCH_SIZEx4]
# 创建状态、动作、奖励和非终止状态的小批量 Variable
# cat 是 Concatenates(连接)的意思
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
# 仅将存在的下一个状态进行连接,排除了 None 值
return batch, state_batch, action_batch, reward_batch, non_final_next_states
def get_expected_state_action_values(self):
'''3. 计算作为教师信号的 Q(s_t, a_t) 值'''
# 3.1 切换网络到推理模式
self.main_q_network.eval()
self.target_q_network.eval()
# 3.2 计算网络输出的 Q(s_t, a_t)
# self.model(state_batch) 输出左右两个动作的 Q 值
# 形状为 [torch.FloatTensor of size BATCH_SIZEx2]
# 为了得到执行动作 a_t 对应的 Q 值,我们根据 action_batch 中执行的动作是左还是右
# 通过 gather 提取相应的 Q 值
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)
# 3.3 计算 max{Q(s_{t+1}, a)} 值,注意检查下一个状态是否存在
# 创建一个检查 cartpole 是否未完成并且 next_state 存在的索引掩码
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# 先将所有值设为 0
next_state_values = torch.zeros(BATCH_SIZE)
a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
# 从 Main Q-Network 获取下一状态的最大 Q 值动作 a_m
# 最后的 [1] 返回对应的动作索引
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]
# 筛选出存在的下一状态并将其尺寸从 32 转换为 32×1
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
# 从 Target Q-Network 获取对应 a_m 动作的 Q 值
# 使用 detach() 提取
# 使用 squeeze() 将尺寸从 [minibatch×1] 转换为 [minibatch]
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()
# 3.4 根据 Q 学习公式计算作为教师的 Q(s_t, a_t) 值
expected_state_action_values = self.reward_batch + GAMMA * next_state_values
return expected_state_action_values
def update_main_q_network(self):
'''4. 更新连接参数'''
# 4.1 切换网络到训练模式
self.main_q_network.train()
# 4.2 计算损失函数(smooth_l1_loss 是 Huber 损失)
# expected_state_action_values 的尺寸为 [minibatch],所以使用 unsqueeze 将其转换为 [minibatch x 1]
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))
# 4.3 更新连接参数
self.optimizer.zero_grad() # 重置梯度
loss.backward() # 计算反向传播
self.optimizer.step() # 更新参数
def update_target_q_network(self): # DDQN 新增
'''使 Target Q-Network 与 Main Q-Network 相同'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())
在Brain类的函数init中设置优化方法时,参数为self.main_q_network.parameters(),设置主Q网络进行训练。其他Brain类的内容在第5章中,将变量model部分更改为变量main-q-network。然后将函数get_expected_state_action_values更改为DDQN版本。最后,重新定义函数update_target_q_network。此函数定期执行更新操作,使目标Q网络的连接参数与主Q网络相同。随着Brain类的改变,需要对Agent类做细微的修改。重新实现函数update_target_q_function,在其中执行Brain类的函数update_target_q_network,在Environment类的试验(episode)结束时,执行Agent类的函数update_target_q_function。在这里的实现中,每2轮试验执行一次,将主Q网络的值复制到目标Q网络。Agent类和Environment类如下所示。此外,注释中说明了动画的绘制和保存。
class Agent:
'''CartPole智能体,带有杆的小车'''
def __init__(self,num_states,num_actions) -> None:
self.brain = Brain(num_states,num_actions)
#为智能体创建大脑以做出决策
def update_q_function(self):
'''Q函数的更新'''
self.brain.replay()
def get_action(self,state,episode):
'''动作的确定'''
action = self.brain.decide_action(state,episode)
return action
def memorize(self,state,action,state_next,reward):
self.brain.memory.push(state,action,state_next,reward)
def update_target_q_function(self):
self.brain.update_target_q_network()
# 这是一个执行CartPole的环境类
class Environment:
def __init__(self):
self.env = gym.make(ENV) # 设定要执行的任务
num_states = self.env.observation_space.shape[0] # 设定任务状态和动作的数量
num_actions = self.env.action_space.n # CartPole的动作(向做或向右)数量为2
self.agent = Agent(num_states, num_actions) # 创建Agent在环境中执行的动作
def run(self):
'''执行'''
episode_10_list = np.zeros(10) # 存储10个试验的连续站立步骤数,并使用平均步骤数进行输出
complete_episodes = 0 # 持续站立195步或更多的试验次数
episode_final = False # 最终尝试目标
frames = [] # 用于存储图像的变量,以使最后一轮成为动画
for episode in range(NUM_EPISODES): # 重复试验次数
observation = self.env.reset() # 环境初始化
state = observation # 直接使用观测作为状态state使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # 将NumPy变量转换为PyTorch Tensor
state = torch.unsqueeze(state, 0) # size 4转换为size 1x4
for step in range(MAX_STEPS): # 1 episode(轮)循环
# if episode_final is True: # 在最终试验中,将各时刻图像添加到帧中
# frames.append(self.env.render(mode='rgb_array'))
action = self.agent.get_action(state, episode) # 求取动作
# 通过执行动作a_t求s_{t+1}和done标志
# 从acttion中指定.item()并获取内容
observation_next, _, done, _ = self.env.step(
action.item()) # 使用'_'是因为在面的流程中不适用reward和info
# 给予奖励。另外,设置episode和state_next的结束评估
if done: # 如果step不超过200,或者如果倾斜超过某个角度,则done为true
state_next = None # 没有下一个状态,因此存储为None
# 添加到最近的10轮的站立步数列表中
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))
if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 如果您在途中倒下,给予奖励-1作为惩罚
complete_episodes = 0 # 重置连续成功记录
else:
reward = torch.FloatTensor([1.0]) # 一直站立直到结束时奖励为1
complete_episodes = complete_episodes + 1 # 更新连续记录
else:
reward = torch.FloatTensor([0.0]) # 普通奖励为0
state_next = observation_next # 保持观察不变
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # 将numpy变量转换为PyTorch Tensor
state_next = torch.unsqueeze(state_next, 0) # size 4转换为size 1x4
# 向经验池中添加经验
self.agent.memorize(state, action, state_next, reward)
# Experience Replay中更新Q函数
self.agent.update_q_function()
# 更新观测值
state = state_next
# 结束处理
if done:
print('%d Episode: Finished after %d steps:10次试验的平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))
if(episode % 2 == 0):
self.agent.update_target_q_function()
break
if episode_final is True:
# 保存并绘制动画
# display_frames_as_gif(frames)
break
# 连续十轮成功
if complete_episodes >= 100:
print('10轮连续成功')
episode_final = True # 使下一次尝试成为最终绘制的动画
cartpole_env = Environment()
cartpole_env.run()
Net(
(fc1): Linear(in_features=4, out_features=32, bias=True)
(fc2): Linear(in_features=32, out_features=32, bias=True)
(fc3): Linear(in_features=32, out_features=2, bias=True)
)
0 Episode: Finished after 19 steps:10次试验的平均step数 = 1.9
1 Episode: Finished after 8 steps:10次试验的平均step数 = 2.7
2 Episode: Finished after 11 steps:10次试验的平均step数 = 3.8
3 Episode: Finished after 11 steps:10次试验的平均step数 = 4.9
4 Episode: Finished after 10 steps:10次试验的平均step数 = 5.9
5 Episode: Finished after 9 steps:10次试验的平均step数 = 6.8
6 Episode: Finished after 9 steps:10次试验的平均step数 = 7.7
7 Episode: Finished after 9 steps:10次试验的平均step数 = 8.6
8 Episode: Finished after 9 steps:10次试验的平均step数 = 9.5
9 Episode: Finished after 10 steps:10次试验的平均step数 = 10.5
10 Episode: Finished after 146 steps:10次试验的平均step数 = 23.2
11 Episode: Finished after 16 steps:10次试验的平均step数 = 24.0
12 Episode: Finished after 17 steps:10次试验的平均step数 = 24.6
13 Episode: Finished after 16 steps:10次试验的平均step数 = 25.1
14 Episode: Finished after 13 steps:10次试验的平均step数 = 25.4
15 Episode: Finished after 19 steps:10次试验的平均step数 = 26.4
16 Episode: Finished after 21 steps:10次试验的平均step数 = 27.6
17 Episode: Finished after 15 steps:10次试验的平均step数 = 28.2
18 Episode: Finished after 14 steps:10次试验的平均step数 = 28.7
19 Episode: Finished after 12 steps:10次试验的平均step数 = 28.9
20 Episode: Finished after 13 steps:10次试验的平均step数 = 15.6
……
328 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
329 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
330 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
331 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
332 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
10轮连续成功
333 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
推荐阅读
发表评论