最新要闻

广告

手机

持续火爆 国缆检测连续20cm涨停!“牛市旗手”再度活跃 财达证券2连板

持续火爆 国缆检测连续20cm涨停!“牛市旗手”再度活跃 财达证券2连板

dnf什么时候维护结束2020.4.4_dnf什么时候维护

dnf什么时候维护结束2020.4.4_dnf什么时候维护

家电

强化学习——策略梯度之Reinforce

来源:博客园

1、策略梯度介绍

相比与DQN,策略梯度方法的区别主要在于,我们对于在某个状态下所采取的动作,并不由一个神经网络来决定,而是由一个策略函数来给出,而这个策略函数的目的,就是使得最终的奖励的累加和最大,这也是训练目标,所以训练会围绕策略函数的梯度来进行。

2、策略函数


(资料图)

以Reinforce算法为例,

假设我们的目标是最大化累积奖励的期望,即最大化以下形式的目标函数J(θ):

J(θ) = E[∑[t=0 to T] (R_t)]

其中,E表示对所有可能的轨迹(trajectories)进行期望,R_t是在时间步t获得的即时奖励。我们的策略函数可以表示为π(a|s;θ),其中θ表示策略函数的参数。我们希望通过调整θ来最大化J(θ),因此我们需要计算目标函数J(θ)关于参数θ的梯度。

具体地,我们将目标函数J(θ)的梯度表示为:

∇θ J(θ) = E[∑[t=0 to T] (∇θ log π(a_t|s_t;θ) * R_t)]

在这里,∇θ表示关于参数θ的梯度,log π(a_t|s_t;θ)表示策略函数在状态s_t下选择动作a_t的对数概率,R_t表示在时间步t获得的即时奖励。

接下来的关键是将整个目标函数的期望转换为对每个轨迹的期望,并使用蒙特卡洛采样来估计这个期望。我们通过采样多个轨迹,计算每个轨迹的梯度,然后取所有轨迹的梯度的平均值作为对目标函数梯度的估计。

为了做到这一点,我们引入累积回报G_t,表示从时间步t开始的累积奖励。G_t的定义如下:

G_t = R_t + γ * R_{t+1} + γ^2 * R_{t+2} + ... + γ^(T-t) * R_T

其中,γ是折扣因子,用于调整未来奖励的重要性。

补充:蒙特卡洛采样法,当我们要计算这个目标期望时是非常困难的,此时我们通过大量采样的方法估算出期望值,这就是蒙特卡洛采样法。其次,这里的回报G不仅仅与当前得到的reward有关,也和只会可以得到的reward有关。

现在,我们可以将目标函数的梯度重写为:

∇θ J(θ) = E[∑[t=0 to T] (∇θ log π(a_t|s_t;θ) * G_t)]

这样,我们就将目标函数的期望转换为了对每个轨迹的期望,然后通过蒙特卡洛采样来估计这个期望。在实际应用中,我们会使用多个样本轨迹来计算梯度的样本均值,并使用梯度上升法来更新策略函数的参数θ,以优化目标函数J(θ)。

以下是实现该数学公式的代码块:

#每往前一步,都衰减0.02,如何加上当前步的反馈 reward_sum *= 0.98 reward_sum += rewards[i] #重新计算动作概率 state = torch .FloatTensor(states[i]).reshape(1,4) prob = model(state) prob = prob[0,actions[i]] loss = -prob.log()*reward_sum loss.backward(retain_graph=True)注意这里的神经网络返回的是动作的概率分布

3、仍以平衡车为例具体实现代码

import gymfrom matplotlib import pyplot as pltimport torchimport randomfrom IPython import displayimport numpy as np #创建环境env = gym.make("CartPole-v1")env.reset() #打印游戏def show(): plt.imshow(env.render(mode="rgb_array")) plt.axis("off") plt.show()#show() #计算动作模型,也就是真正需要使用的模型model = torch.nn.Sequential( torch.nn.Linear(4,128), torch.nn.ReLU(), torch.nn.Linear(128,2), torch.nn.Softmax(dim=1),) def get_action(state): state = torch.FloatTensor(state).reshape(1, 4) prob = model(state) prob_normalized = prob[0].tolist() prob_sum = sum(prob_normalized) prob_normalized = [p / prob_sum for p in prob_normalized] action = np.random.choice(range(2), p=prob_normalized, size=1)[0] return action def get_Date(): states = [] rewards = [] actions = [] state = env.reset() over = False while not over: action = get_action(state) next_state,reward,over,_ = env.step(action) states.append(state) rewards.append(reward) actions.append(action) state = next_state return states,rewards,actions def test(play): state = env.reset() reward_sum = 0 over = False while not over: action = get_action(state) state,reward,over,_ = env.step((action)) reward_sum +=reward if play and random.random()<0.2: display.clear_output(wait=True) show() plt.close() return reward_sum def train(): optimizer = torch.optim.Adam(model.parameters(),lr=1e-3) #玩N局每局游戏训练一次 for epoch in range(1000): states,rewards,actions = get_Date() optimizer.zero_grad() #反馈和 reward_sum = 0 #从最后一步算起 for i in reversed(range(len(states))): #每往前一步,都衰减0.02,如何加上当前步的反馈 reward_sum *= 0.98 reward_sum += rewards[i] #重新计算动作概率 state = torch .FloatTensor(states[i]).reshape(1,4) prob = model(state) prob = prob[0,actions[i]] loss = -prob.log()*reward_sum loss.backward(retain_graph=True) optimizer.step() if epoch%100==0: test_result = sum([test(play=False) for _ in range(10)])/10 print(epoch,test_result) train()test(play=True)

关键词: