6.1深度强化学习算法发展图

图最左边一栏中没有采用深度学习的基础强化学习方法。在迷宫任务中,我们实现了策略迭代法(REINFORCE)、Sarsa和Q学习。第4章介绍了深度学习。第5章实现了DQN,它用深度神经网络表达动作价值函数Q(s,a)。DDQN是双重Q学习和DQN的组合。Q学习和DQN学习动作价值函数Q(s,a),有必要使用一个动作价值函数Q来更新另一个动作价值函数Q,这是导致学习不稳定的一个因素。因此,DDQN使用两个网络来更新作为价值函数的Q函数。

Dueling Network是一种在动作价值函数输出层之前增加一层,用于输出状态价值V(s)和优势函数A(s,a) = Q(s,a)-V(s)的方法,学习状态价值V(s)时,能独立于动作而学习状态价值V(s),具有提高学习性能的优点。这里的V(s)不是所采用的具有最高动作价值的动作的Q值,而是所有动作的平均Q值。

优先经验回放(Prioritized Experience Replay)是一种对“经验回放”进行优化的技术,优先经验回放根据优先级提取transition,优先级的排序标准是网络输出与监督信息的差,具体来说

R

t

+

1

+

γ

max

a

Q

(

s

t

+

1

,

a

)

Q

(

s

t

,

a

t

)

R_{t+1}+\gamma\max\limits_a Q(s_{t+1},a)-Q(s_t,a_t)

Rt+1​+γamax​Q(st+1​,a)−Q(st​,at​)的绝对值,如果绝对值很大,

Q

(

s

t

,

a

t

)

Q(s_t,a_t)

Q(st​,at​)学习不到位,在replay时优先提取它。相反,如果绝对值小,说明学习情况好,则降低提取概率

DQN之后,A3C作为深度强化学习划时代算法引起了人们的关注。A3C时Asynchronous Advantage Actor-Critic的缩写。第一个A是异步,异步分布式学习系统。第二个A是优势。Q学习进行更新时,使用1步后的状态进行更新,这里使用两步或多步后的状态进行更新,这种学习方法称为Advantage学习。第三个A表示Actor-Critic,是策略迭代法和价值迭代法的结合,Actor是一个输出策略的函数,Critic是一个输出价值的函数。Actor-Critic同时使用这两个函数。

像A3C和A2C这样使用多个智能体的算法主要有两个优点。首先,它很容易将强化学习应用于现实世界中,当对现实世界中的机器人等而不是在PC的模拟环境中应用强化学习时,使用多个机器人来减少学习时间是很重要的。第二个优点是不必使用经验回放。在诸如DQN等只有一个智能体的算法中,连续的transition在内容上相似导致学习难以稳定。因此,DQN等使用经验回放来随机地在经验池中对transition进行采样。而使用多智能体的算法中,多个智能体各自创建自己的transition,从而消除了对经验回放的需求。因此,也可以使用RNN(递归神经网络)和LSTM(长短期记忆)等深度神经网络。

一种名为TRPO(Trust Region Policy Optimization,置信区域策略优化)的方法能使策略梯度法更稳定,TRPO也是基于Actor-Critic框架的,它经过改进得到了PPO(Proximal Policy Optimization,近端策略优化)。同时还有一种称为ACTKR(Actor Critic using Kronecker-Factored Trust Region,使用克罗内克系数的置信区域的ActorCritic)的方法,该方法比A2C的Actor-Critic能更有效地学习。

6.2DDQN的实现

2015nature版DQN的更新公式如下:

Q

m

(

s

t

,

a

t

)

=

Q

m

(

s

t

,

a

t

)

+

η

(

R

t

+

1

+

γ

max

a

Q

t

(

s

t

+

1

,

a

)

Q

m

(

s

t

,

a

t

)

)

Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma\max\limits_a *Q_t(s_{t+1},a)-Q_m(s_t,a_t))

Qm​(st​,at​)=Qm​(st​,at​)+η∗(Rt+1​+γamax​∗Qt​(st+1​,a)−Qm​(st​,at​)) 其中,

Q

m

(

s

t

,

a

t

)

Q_m(s_t,a_t)

Qm​(st​,at​)表示主

Q

Q

Q网络,

Q

t

(

s

t

+

1

,

a

)

Q_t(s_{t+1},a)

Qt​(st+1​,a)是目标

Q

Q

Q网络,

max

a

Q

t

(

s

t

+

1

,

a

)

\max\limits_a *Q_t(s_{t+1},a)

amax​∗Qt​(st+1​,a)从目标网络获得下一状态

s

t

+

1

s_{t+1}

st+1​中具有最高

Q

Q

Q的动作

a

a

a和当前时刻的Q值。DDQN是一种使更新公式更稳定的方法,更新公式为:

a

m

=

arg

max

a

Q

(

s

t

+

1

,

a

)

a_m=\arg\max\limits_a Q(s_{t+1},a)

am​=argamax​Q(st+1​,a)

Q

m

(

s

t

,

a

t

)

=

Q

m

(

s

t

,

a

t

)

+

η

(

R

t

+

1

+

γ

Q

t

(

s

t

+

1

,

a

m

)

Q

m

(

s

t

,

a

t

)

)

Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma*Q_t(s_{t+1},a_m)-Q_m(s_t,a_t))

Qm​(st​,at​)=Qm​(st​,at​)+η∗(Rt+1​+γ∗Qt​(st+1​,am​)−Qm​(st​,at​)) 也就是说从主

Q

Q

Q网络获得在下一状态

s

t

+

1

s_{t+1}

st+1​中具有最高

Q

Q

Q的动作

a

m

a_m

am​,并且从目标

Q

Q

Q网络获得该动作

a

m

a_m

am​的

Q

Q

Q值。这称为双重DQN,因为它使用两个网络来确定主Q网络的更新量。

第5章为了优先理解实现流程,编写了代码较长的函数replay而未进行拆分。首先,重构第5章中DQN的程序以缩短Brain类的replay函数,将三个部分分别进行函数化。使用函数make_minibatch创建小批量数据,使用函数get_expected_state_action_values获取监督信息

Q

(

s

t

,

a

t

)

Q(s_t,a_t)

Q(st​,at​),使用函数update_main_q_network更新连接参数。然后可以使用以下代码编写函数。

import numpy as np

import matplotlib.pyplot as plt

from matplotlib import animation

from IPython.display import HTML

import gym

import random

import numpy as np

# 动画显示函数

def display_frames_as_gif(frames):

plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)

patch = plt.imshow(frames[0])

plt.axis('off')

def animate(i):

patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

anim.save('./image/movie_cartpole_DQN.mp4')

plt.close() # 防止显示两个输出

return HTML(anim.to_jshtml())

from collections import namedtuple

#namedtuple就是有名字的元组,使得元组有键名,以便在DQN访问状态和动作值

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

ENV = 'CartPole-v0'

GAMMA = 0.99

MAX_STEPS = 200

NUM_EPISODES = 500

#为了实现小批量学习,实现了内存类ReplayMemory来存储经验数据。ReplayMemory

class ReplayMemory:

'''push函数保存步骤中的transition,随机选择的sample函数'''

def __init__(self,CAPACITY) -> None:

self.capacity = CAPACITY

self.memory = []

self.index = 0

def push(self,state,action,state_next,reward):

'''将transition(state,action,state_next,reward)保存在存储器中'''

if len(self.memory) < self.capacity:

self.memory.append(None)

self.memory[self.index] = Transition(state,action,state_next,reward)

self.index = (self.index + 1) % self.capacity #保存的index移动一位

def sample(self,batch_size):

return random.sample(self.memory,batch_size)

def __len__(self):

'''返回当前memory长度'''

return len(self.memory)

然后将重构的Brain类从DQN更新为DDQN。更改Brain类初始化函数init中的两处,这次我们将构建两个网络:变量main_q_network和变量target_q_network。让我们在另一个名为Net的类中准备构建神经网络。Net类如下所示。

from torch import nn

import torch.nn.functional as F

class Net(nn.Module):

def __init__(self, n_in, n_mid,n_out):

super(Net,self).__init__()

self.fc1 = nn.Linear(n_in,n_mid)

self.fc2 = nn.Linear(n_mid,n_mid)

self.fc3 = nn.Linear(n_mid,n_out)

def forward(self,x):

h1 = F.relu(self.fc1(x))

h2 = F.relu(self.fc2(h1))

output = self.fc3(h2)

return output

将Brain类修改为DDQN版本

# 这是一个类,充当代理的大脑,执行 DDQN

import random

import torch

from torch import nn

from torch import optim

import torch.nn.functional as F

BATCH_SIZE = 32

CAPACITY = 10000

class Brain:

def __init__(self, num_states, num_actions):

self.num_actions = num_actions # 获取 CartPole 的两个动作(向右或向左推)

# 创建用于存储经验的内存对象

self.memory = ReplayMemory(CAPACITY)

# 构建神经网络

n_in, n_mid, n_out = num_states, 32, num_actions

self.main_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类

self.target_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类

print(self.main_q_network) # 输出网络结构

# 设置优化方法

self.optimizer = optim.Adam(

self.main_q_network.parameters(), lr=0.0001)

def replay(self):

'''使用 Experience Replay 学习网络的连接参数'''

# 1. 检查内存大小

if len(self.memory) < BATCH_SIZE:

return

# 2. 创建小批量

self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

# 3. 求出作为教师信号的 Q(s_t, a_t) 值

self.expected_state_action_values = self.get_expected_state_action_values()

# 4. 更新连接参数

self.update_main_q_network()

def decide_action(self, state, episode):

'''根据当前状态决定动作'''

# 逐渐只采取最优动作的 ε-greedy 法

epsilon = 0.5 * (1 / (episode + 1))

if epsilon <= np.random.uniform(0, 1):

self.main_q_network.eval() # 将网络切换到推理模式

with torch.no_grad():

action = self.main_q_network(state).max(1)[1].view(1, 1)

# 提取网络输出的最大值的索引 = max(1)[1]

# .view(1,1) 将 [torch.LongTensor of size 1] 转换为 size 1x1

else:

# 随机返回 0 或 1 动作

action = torch.LongTensor(

[[random.randrange(self.num_actions)]]) # 随机返回 0 或 1 动作

# action 将是 [torch.LongTensor of size 1x1] 的形状

return action

def make_minibatch(self):

'''2. 创建小批量'''

# 2.1 从内存中取出小批量数据

transitions = self.memory.sample(BATCH_SIZE)

# 2.2 将各变量转换为小批量相对应的形状

# transitions 包含 BATCH_SIZE 个 (state, action, state_next, reward)

# 也就是说,有 (state, action, state_next, reward)×BATCH_SIZE

# 我们想要将其转换为小批量的形式

# 也就是 (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)

batch = Transition(*zip(*transitions))

# 2.3 将每个变量的元素转换为小批量对应的形状,并转换为 Variable 以便网络处理

# 例如,对于 state,将 BATCH_SIZE 个 [torch.FloatTensor of size 1x4] 转换为 [torch.FloatTensor of size BATCH_SIZEx4]

# 创建状态、动作、奖励和非终止状态的小批量 Variable

# cat 是 Concatenates(连接)的意思

state_batch = torch.cat(batch.state)

action_batch = torch.cat(batch.action)

reward_batch = torch.cat(batch.reward)

non_final_next_states = torch.cat([s for s in batch.next_state

if s is not None])

# 仅将存在的下一个状态进行连接,排除了 None 值

return batch, state_batch, action_batch, reward_batch, non_final_next_states

def get_expected_state_action_values(self):

'''3. 计算作为教师信号的 Q(s_t, a_t) 值'''

# 3.1 切换网络到推理模式

self.main_q_network.eval()

self.target_q_network.eval()

# 3.2 计算网络输出的 Q(s_t, a_t)

# self.model(state_batch) 输出左右两个动作的 Q 值

# 形状为 [torch.FloatTensor of size BATCH_SIZEx2]

# 为了得到执行动作 a_t 对应的 Q 值,我们根据 action_batch 中执行的动作是左还是右

# 通过 gather 提取相应的 Q 值

self.state_action_values = self.main_q_network(

self.state_batch).gather(1, self.action_batch)

# 3.3 计算 max{Q(s_{t+1}, a)} 值,注意检查下一个状态是否存在

# 创建一个检查 cartpole 是否未完成并且 next_state 存在的索引掩码

non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,

self.batch.next_state)))

# 先将所有值设为 0

next_state_values = torch.zeros(BATCH_SIZE)

a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

# 从 Main Q-Network 获取下一状态的最大 Q 值动作 a_m

# 最后的 [1] 返回对应的动作索引

a_m[non_final_mask] = self.main_q_network(

self.non_final_next_states).detach().max(1)[1]

# 筛选出存在的下一状态并将其尺寸从 32 转换为 32×1

a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

# 从 Target Q-Network 获取对应 a_m 动作的 Q 值

# 使用 detach() 提取

# 使用 squeeze() 将尺寸从 [minibatch×1] 转换为 [minibatch]

next_state_values[non_final_mask] = self.target_q_network(

self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

# 3.4 根据 Q 学习公式计算作为教师的 Q(s_t, a_t) 值

expected_state_action_values = self.reward_batch + GAMMA * next_state_values

return expected_state_action_values

def update_main_q_network(self):

'''4. 更新连接参数'''

# 4.1 切换网络到训练模式

self.main_q_network.train()

# 4.2 计算损失函数(smooth_l1_loss 是 Huber 损失)

# expected_state_action_values 的尺寸为 [minibatch],所以使用 unsqueeze 将其转换为 [minibatch x 1]

loss = F.smooth_l1_loss(self.state_action_values,

self.expected_state_action_values.unsqueeze(1))

# 4.3 更新连接参数

self.optimizer.zero_grad() # 重置梯度

loss.backward() # 计算反向传播

self.optimizer.step() # 更新参数

def update_target_q_network(self): # DDQN 新增

'''使 Target Q-Network 与 Main Q-Network 相同'''

self.target_q_network.load_state_dict(self.main_q_network.state_dict())

在Brain类的函数init中设置优化方法时,参数为self.main_q_network.parameters(),设置主Q网络进行训练。其他Brain类的内容在第5章中,将变量model部分更改为变量main-q-network。然后将函数get_expected_state_action_values更改为DDQN版本。最后,重新定义函数update_target_q_network。此函数定期执行更新操作,使目标Q网络的连接参数与主Q网络相同。随着Brain类的改变,需要对Agent类做细微的修改。重新实现函数update_target_q_function,在其中执行Brain类的函数update_target_q_network,在Environment类的试验(episode)结束时,执行Agent类的函数update_target_q_function。在这里的实现中,每2轮试验执行一次,将主Q网络的值复制到目标Q网络。Agent类和Environment类如下所示。此外,注释中说明了动画的绘制和保存。

class Agent:

'''CartPole智能体,带有杆的小车'''

def __init__(self,num_states,num_actions) -> None:

self.brain = Brain(num_states,num_actions)

#为智能体创建大脑以做出决策

def update_q_function(self):

'''Q函数的更新'''

self.brain.replay()

def get_action(self,state,episode):

'''动作的确定'''

action = self.brain.decide_action(state,episode)

return action

def memorize(self,state,action,state_next,reward):

self.brain.memory.push(state,action,state_next,reward)

def update_target_q_function(self):

self.brain.update_target_q_network()

# 这是一个执行CartPole的环境类

class Environment:

def __init__(self):

self.env = gym.make(ENV) # 设定要执行的任务

num_states = self.env.observation_space.shape[0] # 设定任务状态和动作的数量

num_actions = self.env.action_space.n # CartPole的动作(向做或向右)数量为2

self.agent = Agent(num_states, num_actions) # 创建Agent在环境中执行的动作

def run(self):

'''执行'''

episode_10_list = np.zeros(10) # 存储10个试验的连续站立步骤数,并使用平均步骤数进行输出

complete_episodes = 0 # 持续站立195步或更多的试验次数

episode_final = False # 最终尝试目标

frames = [] # 用于存储图像的变量,以使最后一轮成为动画

for episode in range(NUM_EPISODES): # 重复试验次数

observation = self.env.reset() # 环境初始化

state = observation # 直接使用观测作为状态state使用

state = torch.from_numpy(state).type(

torch.FloatTensor) # 将NumPy变量转换为PyTorch Tensor

state = torch.unsqueeze(state, 0) # size 4转换为size 1x4

for step in range(MAX_STEPS): # 1 episode(轮)循环

# if episode_final is True: # 在最终试验中,将各时刻图像添加到帧中

# frames.append(self.env.render(mode='rgb_array'))

action = self.agent.get_action(state, episode) # 求取动作

# 通过执行动作a_t求s_{t+1}和done标志

# 从acttion中指定.item()并获取内容

observation_next, _, done, _ = self.env.step(

action.item()) # 使用'_'是因为在面的流程中不适用reward和info

# 给予奖励。另外,设置episode和state_next的结束评估

if done: # 如果step不超过200,或者如果倾斜超过某个角度,则done为true

state_next = None # 没有下一个状态,因此存储为None

# 添加到最近的10轮的站立步数列表中

episode_10_list = np.hstack(

(episode_10_list[1:], step + 1))

if step < 195:

reward = torch.FloatTensor(

[-1.0]) # 如果您在途中倒下,给予奖励-1作为惩罚

complete_episodes = 0 # 重置连续成功记录

else:

reward = torch.FloatTensor([1.0]) # 一直站立直到结束时奖励为1

complete_episodes = complete_episodes + 1 # 更新连续记录

else:

reward = torch.FloatTensor([0.0]) # 普通奖励为0

state_next = observation_next # 保持观察不变

state_next = torch.from_numpy(state_next).type(

torch.FloatTensor) # 将numpy变量转换为PyTorch Tensor

state_next = torch.unsqueeze(state_next, 0) # size 4转换为size 1x4

# 向经验池中添加经验

self.agent.memorize(state, action, state_next, reward)

# Experience Replay中更新Q函数

self.agent.update_q_function()

# 更新观测值

state = state_next

# 结束处理

if done:

print('%d Episode: Finished after %d steps:10次试验的平均step数 = %.1lf' % (

episode, step + 1, episode_10_list.mean()))

if(episode % 2 == 0):

self.agent.update_target_q_function()

break

if episode_final is True:

# 保存并绘制动画

# display_frames_as_gif(frames)

break

# 连续十轮成功

if complete_episodes >= 100:

print('10轮连续成功')

episode_final = True # 使下一次尝试成为最终绘制的动画

cartpole_env = Environment()

cartpole_env.run()

Net(

(fc1): Linear(in_features=4, out_features=32, bias=True)

(fc2): Linear(in_features=32, out_features=32, bias=True)

(fc3): Linear(in_features=32, out_features=2, bias=True)

)

0 Episode: Finished after 19 steps:10次试验的平均step数 = 1.9

1 Episode: Finished after 8 steps:10次试验的平均step数 = 2.7

2 Episode: Finished after 11 steps:10次试验的平均step数 = 3.8

3 Episode: Finished after 11 steps:10次试验的平均step数 = 4.9

4 Episode: Finished after 10 steps:10次试验的平均step数 = 5.9

5 Episode: Finished after 9 steps:10次试验的平均step数 = 6.8

6 Episode: Finished after 9 steps:10次试验的平均step数 = 7.7

7 Episode: Finished after 9 steps:10次试验的平均step数 = 8.6

8 Episode: Finished after 9 steps:10次试验的平均step数 = 9.5

9 Episode: Finished after 10 steps:10次试验的平均step数 = 10.5

10 Episode: Finished after 146 steps:10次试验的平均step数 = 23.2

11 Episode: Finished after 16 steps:10次试验的平均step数 = 24.0

12 Episode: Finished after 17 steps:10次试验的平均step数 = 24.6

13 Episode: Finished after 16 steps:10次试验的平均step数 = 25.1

14 Episode: Finished after 13 steps:10次试验的平均step数 = 25.4

15 Episode: Finished after 19 steps:10次试验的平均step数 = 26.4

16 Episode: Finished after 21 steps:10次试验的平均step数 = 27.6

17 Episode: Finished after 15 steps:10次试验的平均step数 = 28.2

18 Episode: Finished after 14 steps:10次试验的平均step数 = 28.7

19 Episode: Finished after 12 steps:10次试验的平均step数 = 28.9

20 Episode: Finished after 13 steps:10次试验的平均step数 = 15.6

……

328 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

329 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

330 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

331 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

332 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

10轮连续成功

333 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。