【深入浅出强化学习-编程实战】3 基于蒙特卡洛的方法
- 3.1 鸳鸯系统——基于蒙特卡洛的方法
- 3.2 部分代码思考
3.1 鸳鸯系统——基于蒙特卡洛的方法
左上为雄鸟,右上为雌鸟,中间有两道障碍物。目标:雄鸟找到雌鸟。
- 在模型已知的时候,我们对状态空间进行遍历,从而进行策略的评估。模型未知的时候,我们不进行状态空间的遍历,而是采用探索的方法,这就涉及了探索和利用的平衡问题。
- 经常讨论的探索方法有两种:探索初始化和探索策略。
yuanyang_env_mc.py
python">import pygame
from resource.load import *
import math
import time
import random
import numpy as np
class YuanYangEnv:
def __init__(self):
# 状态空间
self.states = [] # 0-99
for i in range(0, 100):
self.states.append(i)
self.actions = ['e', 'w', 'n', 's']
# 无模型的强化学习算法需要评估行为-值函数
# 每个方格表示一个状态,每个状态的4个值分别对应着4个动作的行为-值函数
self.gamma = 0.95 # 蒙特卡洛利用整条轨迹的数据进行评估策略,如果gamma太小,后面回报的贡献会很快衰减
# 行为-值函数
self.action_value = np.zeros((100, 4))
# 设置渲染属性
self.viewer = None # 一个渲染窗口
# 帧速率是指程序每秒在屏幕山绘制图像的数目,我们可以用FPS来表示它。一般的计算机都能达到每秒60帧的速度。如果我们把帧速率讲得比较低,那么游戏也会看上去较为卡顿。
self.FPSCLOCK = pygame.time.Clock()
# 屏幕大小
self.screen_size = (1200, 900)
# 雄鸟当前位置
self.bird_position = (0, 0)
# 雄鸟在x方向每走一次像素为120
self.limit_distance_x = 120
# 雄鸟在y方向每走一次像素为90
self.limit_distance_y = 90
# 每个障碍物大小为120像素*90像素
self.obstacle_size = [120, 90]
# 一共有两个障碍物墙,每个障碍物墙由8个小障碍物组成
self.obstacle1_x = []
self.obstacle1_y = []
self.obstacle2_x = []
self.obstacle2_y = []
self.path = []
for i in range(8):
# 第一个障碍物
self.obstacle1_x.append(360)
if i <= 3:
self.obstacle1_y.append(90 * i)
else:
self.obstacle1_y.append(90 * (i + 2))
# 第二个障碍物
self.obstacle2_x.append(720)
if i <= 4:
self.obstacle2_y.append(90 * i)
else:
self.obstacle2_y.append(90 * (i + 2))
# 雄鸟初始位置
self.bird_male_init_position = [0, 0]
# 雄鸟当前位置
self.bird_male_position = [0, 0]
# 雌鸟初始位置
self.bird_female_init_position = [1080, 0]
# 雄鸟碰撞检测子函数
def collide(self, state_position):
# 用标志flag,flag1,flag2分别表示是否与障碍物、障碍物墙1、障碍物墙2发生碰撞
flag = 1
flag1 = 1
flag2 = 1
# 检测雄鸟是否与第一个障碍物墙发生碰撞
# 找到雄鸟与第一个障碍物所有障碍物x方向和y方向最近的障碍物的坐标差
# 并判断最近的坐标差是否大于一个最小运动距离
# 如果大于等于 就不会发生碰撞
dx = []
dy = []
for i in range(8):
dx1 = abs(self.obstacle1_x[i] - state_position[0])
dx.append(dx1)
dy1 = abs(self.obstacle1_y[i] - state_position[1])
dy.append(dy1)
mindx = min(dx)
mindy = min(dy)
if mindx >= self.limit_distance_x or mindy >= self.limit_distance_y:
flag1 = 0 # 没碰
# 是否与第二个障碍物墙碰撞
second_dx = []
second_dy = []
for i in range(8):
dx2 = abs(self.obstacle2_x[i] - state_position[0])
second_dx.append(dx2)
dy2 = abs(self.obstacle2_y[i] - state_position[1])
second_dy.append(dy2)
mindx = min(second_dx)
mindy = min(second_dy)
if mindx >= self.limit_distance_x or mindy >= self.limit_distance_y:
flag2 = 0 # 没碰
if flag1 == 0 and flag2 == 0:
flag = 0 # 没碰
# 是否超出边界,如果是,也认为发生碰撞
if state_position[0] > 1080 or state_position[0] < 0 or state_position[1] > 810 or state_position[1] < 0:
flag = 1 # 碰了
# 返回碰撞标志位
return flag
# 雄鸟是否找到雌鸟子函数
def find(self, state_position):
# 设置标志位flag
# 判断雄鸟当前位置和雌鸟位置坐标差,雄安与最小运动距离则为找到
flag = 0
if abs(state_position[0] - self.bird_female_init_position[0]) < self.limit_distance_x and abs(
state_position[1] - self.bird_female_init_position[1]) < self.limit_distance_y:
flag = 1
return flag
# 状态转化为像素坐标子函数
def state_to_position(self, state):
i = int(state / 10)
j = state % 10
position = [0, 0]
position[0] = 120 * j
position[1] = 90 * i
return position
# 像素转化为状态坐标子函数
def position_to_state(self, position):
i = position[0] / 120
j = position[1] / 90
return int(i + 10 * j)
def reset(self):
#随机产生初始状态
flag1=1
flag2=1
while flag1 or flag2 ==1:
#随机产生初始状态,0~99,randoom.random() 产生一个0~1的随机数
state=self.states[int(random.random()*len(self.states))]
state_position = self.state_to_position(state)
flag1 = self.collide(state_position)
flag2 = self.find(state_position)
return state
# 原来的回报只有在找到目标点和碰到障碍物的时候才有回报,是稀疏回报
# 蒙特卡洛方法对于稀疏回报问题估计方差无穷大
# 为此,我们每一步都给出了回报,将稀疏回报变成稠密汇报
def transform(self,state, action):
# 将当前状态转化为坐标
current_position=self.state_to_position(state)
next_position = [0,0]
flag_collide=0
flag_fnd=0
# 判断当i前坐标是否与障碍物碰撞
flag_collide=self.collide(current_position)
# 判断状态是否是终点
flag_find=self.find(current_position)
if flag_collide == 1:
return state, -10, True
if flag_find == 1:
return state, 10, True
# 状态转移
if action=='e':
next_position[0]=current_position[0]+120
next_position[1]=current_position[1]
if action=='s':
next_position[0]=current_position[0]
next_position[1]=current_position[1]+90
if action=='w':
next_position[0] = current_position[0] - 120
next_position[1] = current_position[1]
if action=='n':
next_position[0] = current_position[0]
next_position[1] = current_position[1] - 90
# 判断next_state是否与障碍物碰撞
flag_collide = self.collide(next_position)
# 如果碰撞,那么回报为-10,并结束
if flag_collide==1:
return self.position_to_state(current_position),-10,True
# 判断是否终点
flag_find = self.find(next_position)
if flag_find==1:
return self.position_to_state(next_position),15,True
# 每走一步回报-2
return self.position_to_state(next_position),-2.5, False
def gameover(self):
for event in pygame.event.get():
if event.type == QUIT:
exit()
def render(self):
if self.viewer is None:
pygame.init()
#画一个窗口
self.viewer=pygame.display.set_mode(self.screen_size,0,32)
pygame.display.set_caption("yuanyang")
#下载图片
self.bird_male = load_bird_male()
self.bird_female = load_bird_female()
self.background = load_background()
self.obstacle = load_obstacle()
#self.viewer.blit(self.bird_male, self.bird_male_init_position)
#在幕布上画图片
self.viewer.blit(self.bird_female, self.bird_female_init_position)
self.viewer.blit(self.background, (0, 0))
self.font = pygame.font.SysFont('times', 15)
self.viewer.blit(self.background,(0,0))
#画直线
for i in range(11):
pygame.draw.lines(self.viewer, (255, 255, 255), True, ((120*i, 0), (120*i, 900)), 1)
pygame.draw.lines(self.viewer, (255, 255, 255), True, ((0, 90* i), (1200, 90 * i)), 1)
self.viewer.blit(self.bird_female, self.bird_female_init_position)
#画障碍物
for i in range(8):
self.viewer.blit(self.obstacle, (self.obstacle1_x[i], self.obstacle1_y[i]))
self.viewer.blit(self.obstacle, (self.obstacle2_x[i], self.obstacle2_y[i]))
#画小鸟
self.viewer.blit(self.bird_male, self.bird_male_position)
# 画动作-值函数
for i in range(100):
y = int(i/10)
x = i % 10
# 往东的值函数
surface = self.font.render(str(round(float(self.action_value[i,0]),2)),True,(0,0,0))
self.viewer.blit(surface,(120*x+80,90*y+45))
# 往南的值函数
surface = self.font.render(str(round(float(self.action_value[i,1]),2)),True,(0,0,0))
self.viewer.blit(surface, (120 * x + 50, 90 * y + 70))
# 往西的值函数
surface = self.font.render(str(round(float(self.action_value[i, 2]), 2)), True, (0, 0, 0))
self.viewer.blit(surface, (120 * x + 10, 90 * y + 45))
# 往北的值函数
surface = self.font.render(str(round(float(self.action_value[i, 3]), 2)), True, (0, 0, 0))
self.viewer.blit(surface, (120 * x + 50, 90 * y + 10))
# 画路径点
for i in range(len(self.path)):
rec_position = self.state_to_position(self.path[i])
pygame.draw.rect(self.viewer, [255, 0, 0], [rec_position[0], rec_position[1], 120, 90], 3)
surface = self.font.render(str(i), True, (255, 0, 0))
self.viewer.blit(surface, (rec_position[0] + 5, rec_position[1] + 5))
pygame.display.update()
self.gameover()
# time.sleep(0.1)
self.FPSCLOCK.tick(30)
if __name__=="__main__":
yy=YuanYangEnv()
yy.render()
while True:
for event in pygame.event.get():
if event.type == QUIT:
exit()
MC_RL.py
python">import pygame
import time
import random
import numpy as np
import matplotlib.pyplot as plt
from yuanyang_env_mc import YuanYangEnv
class MC_RL:
def __init__(self, yuanyang):
# 声明行为值函数 100*4的表格
self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions))) * 0.1
# n用来表示状态行为对被访问的次数 次数初始化
# n[s,a]=1,2,3??求经验平均时 q(s,a) = G(s,a)/n(s,a)
self.n = 0.001 * np.ones((len(yuanyang.states), len(yuanyang.actions)))
self.actions = yuanyang.actions
self.yuanyang = yuanyang
self.gamma = yuanyang.gamma
# 定义贪婪策略greedy-policy
def greedy_policy(self, qfun, state):
amax = qfun[state, :].argmax()
return self.actions[amax]
# 定义epsilon-greedy policy
def epsilon_greedy_policy(self, qfun, state, epsilon):
amax = qfun[state, :].argmax()
# 概率部分
if np.random.uniform() < 1 - epsilon:
# 最优动作
return self.actions[amax]
else:
return self.actions[int(random.random() * len(self.actions))]
# 找到对应动作对应的序号
def find_anum(self, a):
for i in range(len(self.actions)):
if a == self.actions[i]:
return i
# 同策略蒙特卡洛算法 on-policy
def mc_learning_on_policy(self, num_iter, epsilon):
self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions)))
self.n = 0.001 * np.ones((len(yuanyang.states), len((yuanyang.actions))))
# 学习 num_iter次
for iter1 in range(num_iter):
# 采集状态样本
s_sample = []
# 采集动作样本
a_sample = []
# 采集回报样本
r_sample = []
# 固定初始状态
s = 0
done = False
step_num = 0
epsilon = epsilon*np.exp(-iter1 / 1000)
# 设置好基本遍历,进入与环境交互程序,根据当前策略不断采集数据。
# 采集数据过程中为利用当前策略得到动作a,智能体在状态s处采用a与环境交互,从环境中得到下一个状态s_next,回报r,以及是否结束的标志位done
while False == done and step_num < 30:
a = self.epsilon_greedy_policy(self.qvalue, s, epsilon)
# 与环境交互
s_next, r, done = self.yuanyang.transform(s, a)
a_num = self.find_anum(a)
# 往回走给惩罚
if s_next in s_sample:
r = -2
# 存储数据,采用数据
s_sample.append(s)
r_sample.append(r)
a_sample.append(a_num)
step_num += 1
# 转移到下一个状态,继续实验
s = s_next
# 检测结果,判断是否完成
# 任务完成结束条件
if s == 9:
print("同策略第一次完成任务需要的次数:", iter1)
break
# 根据回报值计算折扣累积回报
# 先得到下一个状态处的值函数,然后逆向求解前面状态-值行为对的折扣累积回报
# 最后得到该次实验中第一个状态行为对的折扣累积回报
# 从样本中计算折扣回报 g(s_0) = r_0 +gamma*r_1+...+v(sT)
a = self.epsilon_greedy_policy(self.qvalue, s, epsilon)
g = self.qvalue[s, self.find_anum(a)]
# 计算该序列的第一状态折扣累积回报
for i in range(len(s_sample) - 1, -1, -1):
g *= self.gamma
g += r_sample[i]
# 下面从实验第1个状态开始,依次计算后继状态的折扣累积回报,并利用增量式的方法更新每个状态-行为对的值
# g = G(s1,a)。开始算其他处的累积回报
for i in range(len(s_sample)):
# 计算状态行为对(s,a)的次数,s,a1,....s,a2
self.n[s_sample[i], a_sample[i]] += 1.0
# 利用增量式方法更新值函数
self.qvalue[s_sample[i], a_sample[i]] = (self.qvalue[s_sample[i], a_sample[i]] * (
self.n[s_sample[i], a_sample[i]] - 1) + g) / (self.n[s_sample[i], a_sample[i]])
g -= r_sample[i]
g /= self.gamma
return self.qvalue
# 探索初始化蒙特卡洛实现
def mc_learning_ei(self, num_iter):
self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions)))
self.n = 0.001 * np.ones((len(yuanyang.states), len(yuanyang.actions)))
# 学习num_iter次
for iter1 in range(num_iter):
# 采集状态样本
s_sample = []
# 采集动作样本
a_sample = []
# 采集回报样本
r_sample = []
# 随机初始化状态
s = self.yuanyang.reset()
a = self.actions[int(random.random() * len(self.actions))]
done = False
step_num = 0
# 调用mc_test函数,该函数用来测试经过学习后策略是否找到了目标;如果找到,返回1,否则返回0
# 找到目标时,把蒙特卡洛迭代学习的次数打印出来
if self.mc_test() == 1:
print("探索初始化第1次完成任务需要的次数:", iter1)
break
# 采集数据 s0-a1-s1-a1-s2....terminate state
# 下面用当前贪婪策略进行一次实验,并保存相应的数据到s_sample和a_sample,r_sample
while False == done and step_num < 30:
# 与环境交互
s_next, r, done = self.yuanyang.transform(s, a)
a_num = self.find_anum(a)
# 很关键! 在实际训练过程中,智能体可能会往回走,但是这是不合理的,因此给再次访问已经访问过状态的动作负的回报
# 往回走给予惩罚
if s_next in s_sample:
r = -2
# 存储数据,采样数据
s_sample.append(s)
r_sample.append(r)
a_sample.append(a_num)
step_num += 1
# 转移到下一个状态,继续实验,s0-s1-s2
s = s_next
a = self.greedy_policy(self.qvalue, s)
# 根据回报值计算折扣累积回报
# 先得到下一个状态处的值函数,然后逆向求解前面状态-值行为对的折扣累积回报
# 最后得到该次实验中第一个状态行为对的折扣累积回报
# 从样本中计算折扣回报 g(s_0) = r_0 +gamma*r_1+...+v(sT)
a = self.greedy_policy(self.qvalue, s)
g = self.qvalue[s, self.find_anum(a)]
# 计算该序列的第一状态折扣累积回报
for i in range(len(s_sample) - 1, -1, -1):
g *= self.gamma
g += r_sample[i]
# 下面从实验第1个状态开始,依次计算后继状态的折扣累积回报,并利用增量式的方法更新每个状态-行为对的值
# g = G(s1,a)。开始算其他处的累积回报
for i in range(len(s_sample)):
# 计算状态行为对(s,a)的次数,s,a1,....s,a2
self.n[s_sample[i], a_sample[i]] += 1.0
# 利用增量式方法更新值函数
self.qvalue[s_sample[i], a_sample[i]] = (self.qvalue[s_sample[i], a_sample[i]] * (
self.n[s_sample[i], a_sample[i]] - 1) + g) / (self.n[s_sample[i], a_sample[i]])
g -= r_sample[i]
g /= self.gamma
return self.qvalue
# 测试子函数mc_test(),初始状态为0,与环境交互,
def mc_test(self):
s = 0
s_sample = []
done = False
flag = 0
step_num = 0
while False == done and step_num < 30:
a = self.greedy_policy(self.qvalue, s)
# 与环境交互
s_next, r, done = self.yuanyang.transform(s, a)
s_sample.append(s)
s = s_next
step_num += 1
if s == 9:
flag = 1
return flag
# 主函数
# 首先利用环境类YuanYangEnv()实例化一个智能体;
# 然后,用蒙特卡洛算法实例化一个brain,调用探索初始化蒙特卡洛算法mc_learning_ei
# 将学到的行为值函数给环境,将其渲染出来
# 最后,对学习好的策略进行测试,并显示出路径
if __name__=="__main__":
yuanyang = YuanYangEnv()
brain = MC_RL(yuanyang)
# 探索初始化方法
qvalue1 = brain.mc_learning_ei(num_iter=10000)
# 同策略方法
qvalue2 = brain.mc_learning_on_policy(num_iter=10000,epsilon=0.2)
print(qvalue2)
# 将行为值函数渲染出来
yuanyang.action_value = qvalue2
# 测试学习到的策略
flag = 1
s = 0
path = []
step_num = 0
#print(policy_value.pi)
# 打印最优路径
while flag:
# 渲染路径点
path.append(s)
yuanyang.path = path
a = brain.greedy_policy(qvalue2,s)
print('%d->%s\t'%(s,a),qvalue2[s,0],qvalue2[s,1],qvalue2[s,2],qvalue2[s,3])
yuanyang.bird_male_position = yuanyang.state_to_position(s)
yuanyang.render()
time.sleep(0.25)
step_num+=1
s_,r,t = yuanyang.transform(s,a)
if t == True or step_num >30:
flag = 0
s = s_
# 渲染最后的路径点
yuanyang.bird_male_position = yuanyang.state_to_position(s)
path.append(s)
yuanyang.render()
while True:
yuanyang.render()
结果
3.2 部分代码思考
- MC_RL.py——line 10
python">self.qvalue = np.zeros((len(yuanyang.states),len(yuanyang.actions)))*0.1
- *0.1是什么?
- 我的猜想:零矩阵×0.1数值不变,精度改变成保留一位小数。???