欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 人文社科 > 生活经验 >内容正文

生活经验

强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例

发布时间:2023/11/27 生活经验 32 豆豆
生活随笔 收集整理的这篇文章主要介绍了 强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例

1. 引言

在这个部分补充之前马尔科夫决策和动态规划部分的代码。在以后的内容我会把相关代码都附到相关内容的后面。本部分代码和将来的代码会参考《深度强化学习原理与python实现》与Udacity的课程《Reinforcement Learning》。

2. Gym库

Gym库(http://gym.openai.com/)是OpenAI推出的强化学习实验环境库,它用python语言实现了离散时间智能体/环境接口中的环境部分。除了依赖少量的商业库外,整个项目时开源免费的。

Gym库内置上百种实验环境,包括以下几类。

  • 算法环境:包括一些字符串处理等传统计算机方法的实验环境。
  • 简单文本环境:包括几个用文本表示的简单游戏。
  • 经典控制环境:包括一些简单几何体运动,常用于经典强化学习算法的研究。
  • Atari游戏环境:包括数十个Atari 2600游戏,具有像素化的图形界面,希望玩家尽可能夺得高分。
  • MuJoCo环境:利用收费的MuJoCo运动引擎进行连续性的控制任务。
  • 机械控制环境:关于机械臂的抓取和控制等。

2.1 Gym库安装

Python版本要求为 3.5+(在以后的内容中我将使用python 3.6)。然后就可以是使用pip安装Gym库

首先升级pip,

pip install --upgrade pip

以下命令为最小安装Gym环境,

pip install gym
利用源代码构建

如果愿意,还可以直接克隆gym Git存储库。若想要修改Gym本身或添加环境时,这是特别有用的。下载和安装使用:

git clone https://github.com/openai/gym
cd gym
pip install -e .

之后可以运行pip install -e .[all]来执行包含所有环境的完整安装。这需要安装多个涉及的依赖项,包括cmake和一个最近的pip版本。

2.2 Gym库使用

2.2.1 环境(enviroment)

这里有一个运行程序的最简单的例子。这将为1000个时间步骤运行CartPole-v0环境的一个实例,并在每个步骤中呈现环境。你应该看到一个窗口弹出渲染经典的车杆问题:

import gym
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):env.render()env.step(env.action_space.sample()) # 采取随机行动
env.close()

输出如下:

通常,我们将结束模拟之前,车杆被允许离开屏幕。现在,忽略关于调用step()的警告,即使这个环境已经返回done = True

如果希望运行其他一些环境,可以尝试用类似MountainCar-v0MsPacman-v0(需要Atari依赖项)或Hopper-v1(需要MuJoCo依赖项)来替换上面的cartpole-v0。环境都来自于Env基类。

若丢失了任何依赖项,应该会得到错误消息来提示丢失了什么。安装缺少的依赖项通常非常简单。对于Hopper-v1需要一个的MuJoCo许可证。

代码解释

导入Gym库:

import gym

在导入Gym库后,可以通过make()函数来得到环境对象。每一个环境都有一个ID,它是形如“Xxxxx-v0”的字符串,如“CartPloe-v0”等。环境名称的最后的部分表示版本号,不同版本的环境可能有不同的行为。使用取出环境“CartPloe-v0”的代码为:

env = gym.make('CartPole-v0')

想要查看当前的Gym库已经注册了哪些环境,可以使用以下代码:

from gym import envs
env_specs = envs.registry.all()
env_ids = [env_spec.id for env_spec in env_specs]
env_ids

每个环境都定义了自己的观测空间和动作空间。环境env的观测空间用env.observation_space表示,动作空间用env.action_space表示。观测空间和动作空间既可以是离散空间(即取值是有限个离散的值),也可以使连续的空间(即取值是连续的)。在Gym库中,离散空间一般用gym.spaces.Discrete类表示,连续空间用gym.spaces.Box类表示。例如,环境‘MountainCar-v0’的动作空间是Box(2,)表示观测可以用2个float值表示;环境‘MountainCar-v0’的动作空间是Dicrete(3),表示动作取自{0,1,2}。对于离散空间gym.spaces.Discrete类实例成员n表示有几个可能的取值;对于连续空间,Box类实例的成员low和high表示每个浮点数的取值范围。

接下来使用环境对象env。首先初始化环境对象env,

env.reset()

该调用能返回智能体的初始观测,是np.array对象。

环境初始化之后就可以使用了。使用环境的核心是使用环境对象的step()方法,具体内容会在下面介绍。

env.step()的参数需要取自动作空间。可以使用以下语句从动作空间中随机选取一个动作。

action = env.action_space.sample()

每次调用env.step()只会让环境前进一步。所以,env.step()往往放在循环结构里,通过循环调用来完成整个回合。

在env.reset()或env.step()后,可以使用以下语句以图形化的方法显示当前的环境。

env.render()

使用完环境后,可以使用下列语句关闭环境。

env.close()

如果绘制和实验的图形界面窗口,那么关闭该窗口的最佳方式是调用env.close()。试图直接关闭图形界面可能会导致内存不能释放,甚至会导致死机。

测试智能体在Gym库中某个任务的性能时,学术界一般最关心100个回合的平均回合奖励。对于有些环境,还会制定一个参考的回合奖励值,当连续100个回合的奖励大于指定的值时,就认为这个任务被解决了。但是,并不是所有的任务都指定了这样的值。对于没有指定值的任务,就无所谓任务被解决了或者没有被解决。

2.2.2 观测(observation)

如果我们想得到一个更好的结果,那我们就不能在每一步都采取随机的行动。我们需要知道我们的行动对环境做了什么可能会更好。

环境的step函数返回的正是我们需要的东西。事实上,step函数会返回四个值。这四个值是

  • observation (object): 一个特定环境的对象,代表智能体对环境的观察。例如,来自摄像头的像素数据,机器人的关节角度和关节速度,或者棋盘游戏中的棋盘状态,和env.reset()返回值的意义相同。。
  • reward(float):前一个动作所获得的奖励量。不同环境下的比例不同,但目标始终是增加你的总奖励。
  • done (boolean):是否要再次重置环境。大多数(但不是全部)任务都被划分为定义明确的事件,done为True表示该事件已经终止。(例如,可能是杆子翻得太厉害,或者失去了最后的生命。)
  • info(dict):对调试有用的诊断信息。它有时会对学习有用(例如,它可能包含环境最后一次状态变化背后的原始概率)。然而,你的智能体的评估是不允许使用它来学习的。

这只是经典的 "智能体-环境循环 "的一个实现。每一个时间步,智能体选择一个动作(action),环境返回一个观察(observation)和一个奖励(reward)

import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):observation = env.reset()for t in range(100):env.render()print(observation)action = env.action_space.sample()observation, reward, done, info = env.step(action)if done:print("Episode finished after {} timesteps".format(t+1))break
env.close()

输出如下:

[-0.061586   -0.75893141  0.05793238  1.15547541]
[-0.07676463 -0.95475889  0.08104189  1.46574644]
[-0.0958598  -1.15077434  0.11035682  1.78260485]
[-0.11887529 -0.95705275  0.14600892  1.5261692 ]
[-0.13801635 -0.7639636   0.1765323   1.28239155]
[-0.15329562 -0.57147373  0.20218013  1.04977545]
Episode finished after 14 timesteps
[-0.02786724  0.00361763 -0.03938967 -0.01611184]
[-0.02779488 -0.19091794 -0.03971191  0.26388759]
[-0.03161324  0.00474768 -0.03443415 -0.04105167]

2.2.3 空间(spaces)

在上面的示例中,我们从环境的操作空间中随机取样操作。但这些行为到底是什么呢?每个环境都带有一个action_space和一个observation_space。这些属性类型为Space,它们描述了有效动作和观察的格式:

import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)

离散(Discrete)空间允许一个固定的非负数范围,因此在本例中有效的动作(action)要么是0要么是1。Box空间表示一个n维的盒子,因此有效的观察结果将是一个4个数字的数组。我们也可以检查Box的边界:

print(env.observation_space.high)
#> array([ 2.4       ,         inf,  0.20943951,         inf])
print(env.observation_space.low)
#> array([-2.4       ,        -inf, -0.20943951,        -inf])

这种内省有助于编写适用于许多不同环境的泛型代码。Box离散空间是最常见的空间(space)。我们可以从一个空间取样或检查某物是否属于它:

from gym import spaces
space = spaces.Discrete(8) # Set with 8 elements {0, 1, 2, ..., 7}
x = space.sample()
assert space.contains(x)
assert space.n == 8

2.2.4 程序实例

这里我们通过一个完整的例子来学习如何与Gym库中的环境交互,选用经典控制任务,小车上山(MountainCar-v0)。这里主要介绍交互的代码,而不详细说明这个控制任务及其求解。

首先我们看一下这个任务的观测空间的动作空间

import gym
env = gym.make('MountainCar-v0')
print('观测空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('观测范围 = {} ~ {}'.format(env.observation_space.low, env.observation_space.high))
print('动作数 = {}'.format(env.action_space.n))

输出如下,

观测空间 = Box(2,)
动作空间 = Discrete(3)
观测范围 = [-1.2  -0.07] ~ [0.6  0.07]
动作数 = 3

运行结果告诉我们,观测空间是形状为(2,)的浮点型np.array,而动作空间是取{0, 1, 2}的int型数值。

接下来考虑智能体。智能体往往是我们自己实现的。下面代码给出了一个智能体类——BespokeAgent类。智能体的decide()方法实现了决策功能,而learn()方法实现了学习功能。BespokeAgent类是一个比较简单的类,它只能根据给定的数学表达式进行决策,并且不能有效学习。所以它并不是一个真正意义上的强化学习智能体类。但是,用于演示智能体和环境的交互已经足够了。

class BespokeAgent:def __init__(self, env):passdef decide(self, observation):  # 决策position, velocity = observationlb = min(-0.09 * (position + 0.25) ** 2 + 0.03, 0.3 * (position + 0.9) ** 4 - 0.008)ub = -0.07 * (position + 0.38) ** 2 + 0.06if lb < velocity < ub:action = 2else:action = 0return action  # 返回动作def learn(self, *args):  # 学习passagent = BespokeAgent(env)

接下来我们试图让智能体与环境交互。play_once()函数可以让智能体和环境交互一个回合。这个函数有4个参数。

  • 参数env是环境类。
  • 参数agent是智能体类。
  • 参数render是bool类型变量,指示在运行过程中是否要图形化显示。如果函数参数render为True,那么在交互过程中会调用env.render()以显示图形化界面,而这个界面可以通过调用env.close()关闭。
  • 参数train是bool类型的变量,指示在运行过程中是否训练智能体。在训练过程中应当设置为True,以调用agent.learn()函数;在测试过程中应当设置为False,使得智能体不变。

这个函数有一个返回值episode_reward,是float类型的数值,表示智能体与环境交互一个回合的回合总奖励。

def play_montecarlo(env, agent, render=False, train=False):episode_reward = 0.  # 记录回合总奖励,初始化为0observation = env.reset()  # 重置游戏环境,开始新回合while True:  # 不断循环,直到回合结束if render:  # 判断是否显示env.render()  # 显示图形界面,图形界面可以用env.close()语句关闭action = agent.decide(observation)next_observation, reward, done, _ = env.step(action)  # 执行动作episode_reward += reward  # 收集回合奖励if train:  # 判断是否训练智能体agent.learn(observation, action, reward, done)  # 学习if done:  # 回合结束,跳出循环breakobservation = next_observationreturn episode_reward  # 返回回合总奖励

我们可以用下列代码让智能体和环境交互一个回合,并在交互过程中图形化显示。交互完毕后,可用env.close()语句关闭图形化界面。

env.seed(0)  # 设置随机数种子,只是为了让结果可以精确复现,一般情况下可删去
episode_reward = play_montecarlo(env, agent, render=True)
print('回合奖励 ={}'.format(episode_reward))
env.close()  # 此语句可关闭图形界面

为了系统评估智能体的性能,下列代码求出了连续交互100回合的平均回合奖励。小车上山环境有一个参考的回合奖励值-110,如果当连续100个回合的平均回合奖励大于-110,则认为这个任务被解决了。BespokeAgent类对应的策略的平均回合奖励大概就在-110左右。

episode_rewards = [play_montecarlo(env, agent) for _ in range(100)]
print('平均回合奖励 ={}'.format(np.mean(episode_rewards)))

3.Markov 决策过程

3.1 实例:悬崖寻路

本节考虑Gym库中悬崖寻路问题(CliffWalking-v0)。悬崖寻路问题是这样一种回合制问题:在一个4×124 \times124×12的网格中,智能体最开始在左下角的网格,希望移动到右下角的网格。智能体每次可以在上下左右这四个方向中移动一步,每移动一步会惩罚一个单位的奖励。但是,移动有以下的限制。

  • 智能体不能移出网格。如果智能体想执行某个动作移出网格,那么就让本步不移动。但是这个操作仍然会惩罚一个单位的奖励。
  • 如果智能体将要到达最下一排网格(几开始网格和目标网格之间的10个网格),智能体会立刻回到开始网格,并惩罚100个单位的奖励。这10个网格可被视为“悬崖”。

当智能体移动到终点时,回合结束,回合奖励为各步奖励相加。

3.1.1 实验环境使用

以下代码演示了如何倒入这个环境并查看这个环境的基本信息。

import gym
env = gym.make('CliffWalking-v0')
print('观察空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('状态数量 = {}, 动作数量= {}'.format(env.nS, env.nA))
print('地图大小 = {}'.format(env.shape))

输出为,

观察空间 = Discrete(48)
动作空间 = Discrete(4)
状态数量 = 48, = 4
地图大小 = (4, 12)

这个环境是一个离散的Markov决策过程。在这个Markov决策过程中,每个状态是取自S={0,1,...,46}\mathcal{S}= \{0,1,...,46\}S={0,1,...,46}的int型数值(加上终止状态则为S+=0,1,...,46,47\mathcal{S}^+={0,1,...,46,47}S+=0,1,...,46,47):0表示向上,1表示向右,2表示向下,3表示向左。奖励取自{−1,−100}\{-1,-100\}{1,100},遇到悬崖为-100,否则为-1。

下面代码给出了用给出的策略运行一个回合的代码。函数play_once()有两个参数, 一个是环境对象,另外一个是策略policy,它是np.array类型的实例。

import numpy as np
def play_once(env, policy):total_reward = 0state = env.reset()loc = np.unravel_index(state, env.shape)print('状态 = {} , 位置={} '.format(state, loc))while True:action = np.random.choice(env.nA, p=policy[state])# step函数,详情见2.2.2 观测next_state, reward, done, _ = env.step(action)print('状态 = {}, 位置 ={}, 奖励 ={}'.format(state, loc, reward))total_reward += rewardif done:breakstate = next_statereturn total_reward

以下代码给出了一个最优策略。optimal_policy最优策略是在开始处向上,接着一 路向右,然后到最右边时向下。

actions = np.ones(env.shape, dtype=int)
actions[-1, :] = 0
actions[:, -1] = 2
optimal_policy = np.eye(4)[actions.reshape(-1)]

下面的代码用最优策略运行一个回合。采用最优策略,从开始网格到目标网格一共要 移动13步,回合总奖励为-13。

total_reward = play_once(env, optimal_policy) 
print('总奖励 ={}'.format(total_reward))

输出为,

状态 = 36, 位置 =(3, 0), 奖励 =-1
状态 = 24, 位置 =(2, 0), 奖励 =-1
状态 = 25, 位置 =(2, 1), 奖励 =-1
状态 = 26, 位置 =(2, 2), 奖励 =-1
状态 = 27, 位置 =(2, 3), 奖励 =-1
状态 = 28, 位置 =(2, 4), 奖励 =-1
状态 = 29, 位置 =(2, 5), 奖励 =-1
状态 = 30, 位置 =(2, 6), 奖励 =-1
状态 = 31, 位置 =(2, 7), 奖励 =-1
状态 = 32, 位置 =(2, 8), 奖励 =-1
状态 = 33, 位置 =(2, 9), 奖励 =-1
状态 = 34, 位置 =(2, 10), 奖励 =-1
状态 = 35, 位置 =(2, 11), 奖励 =-1
总奖励 =-13 

3.1.2 求解Bellman期望方程

接下来考虑策略评估。我们用Bellman期望方程求解给定策略的状态价值和动作价值。 首先来看状态价值。用状态价值表示状态价值的Bellmn期望方程为:
vπ(s)=∑aπ(a∣s)[∑s′,rp(s′,r∣s,a)[r+γvπ(s′)]],s∈Sv_\pi(s) = \sum_a \pi(a|s)[\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')]],s \in \mathcal{S}vπ(s)=aπ(as)[s,rp(s,rs,a)[r+γvπ(s)]],sS
这是一个线性方程组,其标准形式为
vπ(s)−γ∑a∑s′π(a∣s)p(s′∣s,a)vπ(s′)=∑aπ(a∣s)∑s′,rrp(s′,r∣s,a),s∈Sv_\pi(s)-\gamma\sum_{a}\sum_{s'}\pi(a|s)p(s'|s,a)v_\pi(s') =\sum_{a}\pi(a|s)\sum_{s',r}rp(s',r|s,a),s \in \mathcal{S}vπ(s)γasπ(as)p(ss,a)vπ(s)=aπ(as)s,rrp(s,rs,a),sS
得到标准形式后就可以调用相关函数直接求解。得到状态价值函数后,可以用状态价值表示动作价值的Bellan方程,
qπ(s,a)=∑s′,rp(s′,r∣s,a)[r+γvπ(s′)],s∈S,a∈Aq_\pi(s,a) =\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')],s \in \mathcal{S},a \in \mathcal{A}qπ(s,a)=s,rp(s,rs,a)[r+γvπ(s)],sS,aA

来求动作价值函数。

函数evaluate_bellman ()实现了上述功能。状态价值求解部分用np.linalg.solve()函数求解标准形式的线性方程组。得到状态价值后,直接计算得到动作价值.

环境的动力系统储存在env.P里,它是一个元组列表,每个元组包括概率、下一状态、奖励\回合结束指示这4个部分。

# 用Bellman方程求解状态价值和动作价值
def evaluate_bellman(env, policy, gamma=1.):a, b = np.eye(env.nS), np.zeros((env.nS))for state in range(env.nS - 1):for action in range(env.nA):pi = policy[state][action]# print('{}{}:pi:{}'.format(state, action, pi))# env.P中存储环境的动力for p, next_state, reward, done in env.P[state][action]:a[state, next_state] -= (pi * gamma)b[state] += (pi * reward * p)# np.linalg.solve()函数求解标准形式的线性方程组v = np.linalg.solve(a, b)q = np.zeros((env.nS, env.nA))# 利用状态价值函数求解动作价值函数for state in range(env.nS - 1):for action in range(env.nA):for p, next_state, reward, done in env.P[state][action]:q[state][action] += ((reward + gamma * v[next_state]) * p)return v, q

接下来使用evaluate_bellman()函数评估给出的策略。以下代码分别评估了一个随机策略和最优确定性策略,并输出了状态价值函数和动作价值函数。

评估随机策略,

policy = np.random.uniform(size=(env.nS, env.nA)) 
policy = policy / np.sum(policy, axis=1)[:, np.newaxis] 
state_values, action_values = evaluate_bellman(env, policy) 
print('状态价值 ={}'.format(state_values)) 
print('动作价值 ={}'.format(action_values))
状态价值 =[-134140.09187241 -134085.47213402 -133971.92139818 -133616.23666259-133194.20403738 -132781.56536103 -132435.88940662 -128380.91096168...-133680.76568151 -130121.16858665 -115062.2396112        0.        ]
动作价值 =[[-1.34141092e+05 -1.34086472e+05 -1.34152621e+05 -1.34141092e+05][-1.34086472e+05 -1.33972921e+05 -1.34152699e+05 -1.34141092e+05]...[ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]]

评估最优策略,

optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)
print('最优状态价值 = {}'.format(optimal_state_values))
print('最优动作价值 = {}'.format(optimal_action_values))
最优状态价值 = [[-14. -13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.][-13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.  -2.][-12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.  -2.  -1.][-13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.   0.]]
最优动作价值 = [[ -15.  -14.  -14.  -15.][ -14.  -13.  -13.  -15.][ -13.  -12.  -12.  -14.]...[   0.    0.    0.    0.]]

3.1.3 求解Bellman最优方程

对于悬崖寻路这样的数值环境,可以使用线性规划求解。线性规划问题的标准形式为:
minimise        ∑s∈Sv(s)over        v(s),s∈Ss.t.       v(s)−γ∑s′,rp(s′,r∣s,a)v∗(s′)≥∑s′,rrp(s′,r∣s,a),s∈S\begin{aligned}\text{minimise} \ \ \ \ \ \ \ \ & \sum_{s \in \mathcal{S}}v(s) \\ \text{over} \ \ \ \ \ \ \ \ & v(s),s \in \mathcal{S}\\ \text{s.t.} \ \ \ \ \ \ \ &v(s)-\gamma\sum_{s',r}p(s',r|s,a)v_*(s')\geq\sum_{s',r}rp(s',r|s,a), s \in \mathcal{S} \end{aligned}minimise        over        s.t.       sSv(s)v(s),sSv(s)γs,rp(s,rs,a)v(s)s,rrp(s,rs,a),sS

其中目标函数中状态价值的系数已经被固定为1。也可以选择其他正实数作为系数。

以下代码使用scipy.optimize.linprog()函数来计算这个动态规划问题。

scipy.optimize.linprog(c,A_ub = None,b_ub = None,A_eq = None,b_eq = None,bounds = None,method = 'interior-point',callback = None,options = None,x0 = None)

min⁡xcTxsuch thatAubx≤bubAeqx=beql≤x≤u\begin{aligned}\min_{x} \ & c^T x \\\text{such that} & \ A_{ub}x \leq b_{ub} \\ & \ A_{eq}x = b_{eq} \\& \ l \leq x \leq u \end{aligned}xmin such thatcTx Aubxbub Aeqx=beq lxu

其中,c是价值向量;A_ub和b_ub对应线性不等式约束;A_eq和b_eq对应线性等式约束;bounds对应公式中的lb和ub,决策向量的下界和上界;method是求解器的类型,如单纯形法.

官方帮助文档可查看此处.

这个函数的 第0个参数是目标函数中各决策变量在目标函数中的系数,本例中都取1;第1个和第2个 参数是形如“Ax≤bAx≤bAxb”这样的不等式约束的A和b的值。函数optimal_bellman ()刚开始就 计算得到这些值。scipy.optimize.linprog()还有关键字参数bounds,指定决策变量是否为有界的。本例中,决策变量都是无界的。无界也要显式指定,不可以忽略。还有关键字参数 method确定优化方法。默认的优化方法不能处理不等式约束,这里选择了能够处理不等式约束的内点法(interior-point method)。

import scipy
def optimal_bellman(env, gamma=1.):p = np.zeros((env.nS, env.nA, env.nS))r = np.zeros((env.nS, env.nA))for state in range(env.nS - 1):for action in range(env.nA):for prob, next_state, reward, done in env.P[state][action]:p[state, action, next_state] += probr[state, action] += (reward * prob)c = np.ones(env.nS)a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)b_ub = -r.reshape(-1)a_eq = np.zeros((0, env.nS))b_eq = np.zeros(0)bounds = [(None, None), ] * env.nSres = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')v = res.xq = r + gamma * np.dot(p, v)return v, qoptimal_state_values, optimal_action_values = optimal_bellman(env)
print('最优状态价值 ={}'.format(optimal_state_values))
print('最优动作价值 ={}'.format(optimal_action_values))

求得最优动作价值后,可以用argmax计算出最优确定策略。以下代码给出了最优策略,

optimal_actions = optimal_action_values.argmax(axis=1)
print('最优策略 ={}'.format(optimal_actions))

在pycharm IDE下的总程序为

import gym
import numpy as np
import scipy# 单回合运行函数
def play_once(env, policy):total_reward = 0state = env.reset()while True:action = np.random.choice(env.nA, p=policy[state])next_state, reward, done, _ = env.step(action)# np.unravel_index函数的作用是获取一个/组int类型的索引值在一个多维数组中的位置。loc = np.unravel_index(state, env.shape)print('状态 = {}, 位置 ={}, 奖励 ={}'.format(state, loc, reward))total_reward += rewardif done:breakstate = next_statereturn total_reward# 用Bellman方程求解状态价值和动作价值
def evaluate_bellman(env, policy, gamma=1.):a, b = np.eye(env.nS), np.zeros((env.nS))for state in range(env.nS - 1):for action in range(env.nA):pi = policy[state][action]# print('{}{}:pi:{}'.format(state, action, pi))# env.P中存储环境的动力for p, next_state, reward, done in env.P[state][action]:a[state, next_state] -= (pi * gamma)b[state] += (pi * reward * p)# np.linalg.solve()函数求解标准形式的线性方程组v = np.linalg.solve(a, b)q = np.zeros((env.nS, env.nA))# 利用状态价值函数求解动作价值函数for state in range(env.nS - 1):for action in range(env.nA):for p, next_state, reward, done in env.P[state][action]:q[state][action] += ((reward + gamma * v[next_state]) * p)return v, q# 使用动态规划求解最优策略
def optimal_bellman(env, gamma=1.):p = np.zeros((env.nS, env.nA, env.nS))r = np.zeros((env.nS, env.nA))for state in range(env.nS - 1):for action in range(env.nA):for prob, next_state, reward, done in env.P[state][action]:p[state, action, next_state] += probr[state, action] += (reward * prob)c = np.ones(env.nS)a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)b_ub = -r.reshape(-1)a_eq = np.zeros((0, env.nS))b_eq = np.zeros(0)bounds = [(None, None), ] * env.nSres = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')v = res.xq = r + gamma * np.dot(p, v)return v, q# 主函数
if __name__ == '__main__':env = gym.make('CliffWalking-v0')print('观察空间 = {}'.format(env.observation_space))print('动作空间 = {}'.format(env.action_space))print('状态数量 = {}, 动作数量= {}'.format(env.nS, env.nA))print('地图大小 = {}'.format(env.shape))''' # 定义最优策略actions = np.ones(env.shape, dtype=int)actions[-1, :] = 0actions[:, -1] = 2optimal_policy = np.eye(4)[actions.reshape(-1)]total_reward = play_once(env, optimal_policy)print('总奖励 ={}'.format(total_reward))optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)print('最优状态价值 = {}'.format(optimal_state_values.reshape(4, -1)))print('最优动作价值 = {}'.format(optimal_action_values))''''''# 定义随机策略policy = np.random.uniform(size=(env.nS, env.nA))# 归一化policy = policy / np.sum(policy, axis=1)[:, np.newaxis]state_values, action_values = evaluate_bellman(env, policy)print('状态价值 ={}'.format(state_values))print('动作价值 ={}'.format(action_values))'''# 求解最优策略optimal_state_values, optimal_action_values = optimal_bellman(env)print('最优状态价值 ={}'.format(optimal_state_values))print('最优动作价值 ={}'.format(optimal_action_values))optimal_actions = optimal_action_values.argmax(axis=1)print('最优策略 ={}'.format(optimal_actions))

输出为,

观察空间 = Discrete(48)
动作空间 = Discrete(4)
状态数量 = 48, 动作数量= 4
地图大小 = (4, 12)
最优状态价值 =[-1.40000000e+01 -1.30000000e+01 -1.20000000e+01 -1.10000000e+01-1.00000000e+01 -9.00000000e+00 -8.00000000e+00 -7.00000000e+00-6.00000000e+00 -5.00000000e+00 -4.00000000e+00 -3.00000000e+00-1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01-9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00-5.00000000e+00 -4.00000000e+00 -3.00000000e+00 -2.00000000e+00-1.20000000e+01 -1.10000000e+01 -1.00000000e+01 -9.00000000e+00-8.00000000e+00 -7.00000000e+00 -6.00000000e+00 -5.00000000e+00-4.00000000e+00 -3.00000000e+00 -2.00000000e+00 -1.00000000e+00-1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01-9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00-5.00000000e+00 -4.00000000e+00 -9.99999999e-01  1.82291993e-11]
最优动作价值 =[[ -14.99999999  -13.99999999  -13.99999999  -14.99999999][ -13.99999999  -13.          -13.          -14.99999999]...[   0.            0.            0.            0.        ]]
最优策略 =[2 1 1 1 1 1 1 1 1 1 1 2
1 1 1 1 1 1 1 1 1 1 1 2 
1 1 1 1 1 1 1 1 1 1 1 2 
0 0 0 0 0 0 0 0 0 0 1 0]

4. 动态规划

4.1 案例:冰面滑行

冰面滑行问题(FrozenLake-v0)是扩展库Gym里内置的一个文本环境任务。该问题的背景如下:在一个大小为4x4的湖面上,有些地方结冰了,有些地方没有结冰。我们可以用一个4x4的字符矩阵来表示湖面的情况,

SFFF
FHFH
FFFH
HFFG

其中字母“F”(Frozen)表示结冰的区域,字母“H”(Hole)表示未结冰的冰窟窿,字母“S”(Start)和字母“G”(Goal)分别表示移动任务的起点和目标。在这个湖面上要执行以下的移动任务:要从“S”处移动到“G”处。每一次移动,可以选择“左”、“下”、“右”、“上”4个方向之一进行移动,每次移动一格。如果移动到G处,则回合结束,获得1个单位的奖励;如果移动到“H”处,则回合结束,没有获得奖励;如果移动到其它字母,暂不获得奖励,可以继续。由于冰面滑,所以实际移动的方向和想要移动的方向并不一定完全一致。例如,如果在某个地方想要左移,但是由于冰面滑,实际可能下移、右移或者上移。任务的目标是尽可能达到“G”处,以获得奖励。

4.1.1 实验环境使用

首先我们来看如何使用扩展库Gym中的环境。

用下列语句引入环境对象:

import gym 
env = gym.make('FrozenLake-v0')
env = env.unwrapped

这个环境的状态空间有16个不同的状态{s1,s2,...,s15}\{s_1, s_2,...,s_{15}\}{s1,s2,...,s15},表示当前处在哪一个位置;动作空间有4个不同的动作{a0,a1,a2,a3}\{a_0, a_1, a_2, a_3\}{a0,a1,a2,a3},分别表示上、下、左、右四个方向。在扩展库Gym中,直接用int型数值来表示这些状态和动作。下列代码可以查看环境的状态空间和动作空间,

print(env.observation_space)
print(env.action_space)

这个环境的动力系统储存在env.P里。可以用下列方法查看某个状态(如状态14)某个动作(如右移)情况下的动力,

env.unwrapped.P[14][2]

它是一个元组列表,每个元组包括概率、下一状态、奖励和回合结束指示这4个部分。例如,env.P[14][2]返回元组列表[(0.333333333333333,14, 0.0, False),(0.333333333333333, 15, 1.0 True), (0.333333333333333333, 10, 0.0, False)]这表明:
p(s14,0∣s14,a2)=13p(s_{14}, 0|s_{14},a_2) = \frac{1}{3}p(s14,0s14,a2)=31
p(s15,1∣s14,a2)=13p(s_{15}, 1|s_{14},a_2) = \frac{1}{3}p(s15,1s14,a2)=31
p(s10,0∣s14,a2)=13p(s_{10}, 0|s_{14},a_2) = \frac{1}{3}p(s10,0s14,a2)=31
接下来我们来看怎么使用环境。像之前介绍的,要使用env.reset()和env.step来执行。执行一个回合的代码如下所示,其中的play_policy()函数接受参数policy,这是一个16x4的np.array对象,表示策略π\piπ

play_policy()函数返回一个浮点数,表示本回合的奖励。

# 使用策略执行一个回合
def play_policy(env, policy, render=False):total_reward = 0.observation = env.reset()while True:if render:env.render()  # 此行可显示action = np.random.choice(env.action_space.n, p=policy[observation])observation, reward, done, _ = env.step(action)total_reward += reward   # 统计回合奖励if done:  # 游戏结束breakreturn total_reward

接下来用刚刚定义的play_policy()函数来看看随机策略的性能。下面代码够早了随机策略random_policy,它对于任意的s∈S,a∈As \in \mathcal{S}, a \in \mathcal{A}sS,aA均有π(s,a)=1∣A∣\pi(s, a) = \frac{1}{|\mathcal{A}|}π(s,a)=A1.运行下列代码,可以求得随机策略获得奖励的期望值。一般情况下的结果基本为0,这意味着随机策略几乎不可能成功到达目的地。

# 随机策略random_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAepisode_rewards = [play_policy(env, random_policy) for _ in range(100)]print("随机策略 平均奖励 :{}".format(np.mean(episode_rewards)))

4.1.2 有模型策略迭代求解

这个部分实现策略评估、策略提升和策略迭代。

首先来看策略评估。以下代码给出了策略评估的代码。它首先定义了函数v2q(),这个函数可以根据状态价值函数计算含有某个状态的动作价值函数。利用者函数evaluate_policy()函数迭代结算了给定策略policy的状态价值。这个函数使用theta作为精度控制的参数。之后的代码测试了evaluate_policy()函数。它首先求得了随机策略的状态价值函数,然后利用v2q求得动作价值函数。

# 策略评估的实现
def v2q(env, v, s=None, gamma=1.):if s is not None:q = np.zeros(env.unwrapped.nA)for a in range(env.unwrapped.nA):for prob, next_state, reward, done in env.unwrapped.P[s][a]:q[a] += prob * (reward + gamma * v[next_state] * (1. - done))else:q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):q[s] = v2q(env, v, s, gamma)return q# 定义策略评估函数
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vs = sum(policy[s] * v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vs))v[s] = vsif delta < tolerant:breakreturn v
# 对随机策略进行策略评估
print('状态价值函数:')
v_random = evaluate_policy(env, random_policy)
print(v_random.reshape(4, 4))print('动作价值函数:')
q_random = v2q(env, v_random)
print(q_random)

接下来看看策略改进。improve_policy()函数实现了策略改进算法。输入的策略是policy,改进后的策略直接覆盖原有的policy。该函数返回一个bool类型的值,表示输入的策略是不是最优策略。之后的代码测试了improve_policy()函数,它对随机策略进行改进,得到了一个确定性策略。

# 策略改进的实现
def improve_policy(env, v, policy, gamma=1.):optimal = Truefor s in range(env.unwrappped.nS):q = v2q(env, v, s, gamma)a = np.argmax(q)if policy[s][a] != 1:optimal = Falsepolicy[s] = 0.policy[s][a] = 1.return optimal
# 对随机策略进行策略改进
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:print('无更新, 最优策略为:')
else:print('有更新,最优策略为:')
print(policy)

实现了策略评估和策略改进后,我们就可以实现策略迭代。iterate_policy()函数实现了策略迭代算法。之后的代码对iterate_policy()进行测试。针对冰面滑性问题,该代码求得了最优策略,并进行了测试。

# 策略迭代的实现
def iterate_policy(env, gamma=1., tolerant=1e-6:policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.wrapped.nAwhile True:v = evaluate_policy(env, policy, gamma, tolerant)if improve_policy(env, v, policy):breakreturn policy, v
# 利用策略迭代求解最优策略
policy_pi, v_pi = iterate_policy(env)
print('状态价值函数 = ')
print(v_pi.reshape(4, 4)
print('最优策略 = ')
print(np.argmax(policy_pi, axis=1).reshape(4, 4))

4.1.3 有模型价值迭代求解

现在我们用价值迭代算法求解冰面滑行问题的最优策略。以下代码中iterate_value()函数实现了价值迭代算法。这个函数使用参数tolerant来控制价值迭代的精度。之后的代码在冰面滑行问题上测试了iterate_value()函数。

# 价值迭代的实现
def iterate_value(env, gamma=1, tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vmax = max(v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vmax))v[s] = vmaxif delta < tolerant:breakpolicy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):a = np.argmax(v2q(env, v, s, gamma))policy[s][a] = 1.return policy, v
    # 利用策略迭代求解最优策略policy_pi, v_pi = iterate_policy(env)print('状态价值函数 = ')print(v_pi.reshape(4, 4))print('最优策略 = ')print(np.argmax(policy_pi, axis=1).reshape(4, 4))print('价值迭代 平均奖励:{}'.format(play_policy(env, policy_pi)))

策略迭代和价值迭代得到的最优价值函数和最优策略应该是一致的。最优状态价值函数为:
[0.82350.82350.82350.82350.823500.529400.82350.82350.7647000.88240.94120]\begin{bmatrix}0.8235 &0.8235 &0.8235& 0.8235\\ 0.8235&0&0.5294&0\\ 0.8235&0.8235&0.7647& 0\\ 0&0.8824& 0.9412&0\end{bmatrix}0.82350.82350.823500.8235 00.82350.88240.82350.52940.76470.94120.8235000

最优策略为:
[0333000031000210]\begin{bmatrix}0&3&3&3\\ 0&0&0&0\\ 3&1&0&0\\ 0&2&1&0\end{bmatrix}0030301230013000

在pycharm中可以运行的总程序如下,

import gym
import numpy as np# 使用策略执行一个回合
def play_policy(env, policy, render=False):total_reward = 0.observation = env.reset()while True:if render:env.render()  # 此行可显示action = np.random.choice(env.action_space.n, p=policy[observation])observation, reward, done, _ = env.step(action)total_reward += reward   # 统计回合奖励if done:  # 游戏结束breakreturn total_reward# 策略评估的实现
def v2q(env, v, s=None, gamma=1.):if s is not None:q = np.zeros(env.unwrapped.nA)for a in range(env.unwrapped.nA):for prob, next_state, reward, done in env.unwrapped.P[s][a]:q[a] += prob * (reward + gamma * v[next_state] * (1. - done))else:q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):q[s] = v2q(env, v, s, gamma)return q# 定义策略评估函数
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vs = sum(policy[s] * v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vs))v[s] = vsif delta < tolerant:breakreturn v# 策略改进
def improve_policy(env, v, policy, gamma=1.):optimal = Truefor s in range(env.unwrapped.nS):q = v2q(env, v, s, gamma)a = np.argmax(q)if policy[s][a] != 1:optimal = Falsepolicy[s] = 0.policy[s][a] = 1.return optimal# 策略迭代的实现
def iterate_policy(env, gamma=1., tolerant=1e-6):policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAwhile True:v = evaluate_policy(env, policy, gamma, tolerant)if improve_policy(env, v, policy):breakreturn policy, v# 价值迭代的实现
def iterate_value(env, gamma=1, tolerant=1e-6):v = np.zeros(env.unwrapped.nS)while True:delta = 0for s in range(env.unwrapped.nS):vmax = max(v2q(env, v, s, gamma))delta = max(delta, abs(v[s]-vmax))v[s] = vmaxif delta < tolerant:breakpolicy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):a = np.argmax(v2q(env, v, s, gamma))policy[s][a] = 1.return policy, vif __name__ == '__main__':env = gym.make('FrozenLake-v0')env = env.unwrappedrandom_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAepisode_rewards = [play_policy(env, random_policy) for _ in range(100)]print("随机策略 平均奖励 :{}".format(np.mean(episode_rewards)))print('状态价值函数:')v_random = evaluate_policy(env, random_policy)print(v_random.reshape(4, 4))print('动作价值函数:')q_random = v2q(env, v_random)print(q_random)policy = random_policy.copy()optimal = improve_policy(env, v_random, policy)if optimal:print('无更新, 最优策略为:')else:print('有更新,最优策略为:')print(policy)# 利用策略迭代求解最优策略policy_pi, v_pi = iterate_policy(env)print('状态价值函数 = ')print(v_pi.reshape(4, 4))print('最优策略 = ')print(np.argmax(policy_pi, axis=1).reshape(4, 4))print('价值迭代 平均奖励:{}'.format(play_policy(env, policy_pi)))
随机策略 平均奖励 :0.04
状态价值函数:
[[0.0139372  0.01162942 0.02095187 0.01047569][0.01624741 0.         0.04075119 0.        ][0.03480561 0.08816967 0.14205297 0.        ][0.         0.17582021 0.43929104 0.        ]]
动作价值函数:
[[0.01470727 0.01393801 0.01393801 0.01316794][0.00852221 0.01162969 0.01086043 0.01550616][0.02444416 0.0209521  0.02405958 0.01435233][0.01047585 0.01047585 0.00698379 0.01396775][0.02166341 0.01701767 0.0162476  0.01006154][0.         0.         0.         0.        ][0.05433495 0.04735099 0.05433495 0.00698396][0.         0.         0.         0.        ][0.01701767 0.04099176 0.03480569 0.04640756][0.0702086  0.11755959 0.10595772 0.05895286][0.18940397 0.17582024 0.16001408 0.04297362][0.         0.         0.         0.        ][0.         0.         0.         0.        ][0.08799662 0.20503708 0.23442697 0.17582024][0.25238807 0.53837042 0.52711467 0.43929106][0.         0.         0.         0.        ]]
有更新,最优策略为:
[[1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 0. 1.][0. 1. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 1. 0.][0. 1. 0. 0.][1. 0. 0. 0.]]
状态价值函数 = 
[[0.82351246 0.82350689 0.82350303 0.82350106][0.82351416 0.         0.5294002  0.        ][0.82351683 0.82352026 0.76469786 0.        ][0.         0.88234658 0.94117323 0.        ]]
最优策略 = 
[[0 3 3 3][0 0 0 0][3 1 0 0][0 2 1 0]]
价值迭代 平均奖励:1.0

总结

以上是生活随笔为你收集整理的强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。