【深入浅出强化学习-编程实战】3 基于蒙特卡洛的方法的鸳鸯系统

news/2024/5/19 0:11:11 标签: python, 算法, 强化学习, 机器学习, 深度学习

【深入浅出强化学习-编程实战】3 基于蒙特卡洛的方法

    • 3.1 鸳鸯系统——基于蒙特卡洛的方法
    • 3.2 部分代码思考

3.1 鸳鸯系统——基于蒙特卡洛的方法

左上为雄鸟,右上为雌鸟,中间有两道障碍物。目标:雄鸟找到雌鸟。

  • 在模型已知的时候,我们对状态空间进行遍历,从而进行策略的评估。模型未知的时候,我们不进行状态空间的遍历,而是采用探索的方法,这就涉及了探索和利用的平衡问题。
  • 经常讨论的探索方法有两种:探索初始化和探索策略。

yuanyang_env_mc.py

python">import pygame
from resource.load import *
import math
import time
import random
import numpy as np

class YuanYangEnv:
    def __init__(self):
        # 状态空间
        self.states = []  # 0-99
        for i in range(0, 100):
            self.states.append(i)
        self.actions = ['e', 'w', 'n', 's']

        # 无模型的强化学习算法需要评估行为-值函数
        # 每个方格表示一个状态,每个状态的4个值分别对应着4个动作的行为-值函数
        self.gamma = 0.95 # 蒙特卡洛利用整条轨迹的数据进行评估策略,如果gamma太小,后面回报的贡献会很快衰减
        # 行为-值函数
        self.action_value = np.zeros((100, 4))

        # 设置渲染属性
        self.viewer = None  # 一个渲染窗口
        # 帧速率是指程序每秒在屏幕山绘制图像的数目,我们可以用FPS来表示它。一般的计算机都能达到每秒60帧的速度。如果我们把帧速率讲得比较低,那么游戏也会看上去较为卡顿。
        self.FPSCLOCK = pygame.time.Clock()
        # 屏幕大小
        self.screen_size = (1200, 900)
        # 雄鸟当前位置
        self.bird_position = (0, 0)
        # 雄鸟在x方向每走一次像素为120
        self.limit_distance_x = 120
        # 雄鸟在y方向每走一次像素为90
        self.limit_distance_y = 90
        # 每个障碍物大小为120像素*90像素
        self.obstacle_size = [120, 90]
        # 一共有两个障碍物墙,每个障碍物墙由8个小障碍物组成
        self.obstacle1_x = []
        self.obstacle1_y = []
        self.obstacle2_x = []
        self.obstacle2_y = []
        self.path = []
        for i in range(8):
            # 第一个障碍物
            self.obstacle1_x.append(360)
            if i <= 3:
                self.obstacle1_y.append(90 * i)
            else:
                self.obstacle1_y.append(90 * (i + 2))
            # 第二个障碍物
            self.obstacle2_x.append(720)
            if i <= 4:
                self.obstacle2_y.append(90 * i)
            else:
                self.obstacle2_y.append(90 * (i + 2))
        # 雄鸟初始位置
        self.bird_male_init_position = [0, 0]
        # 雄鸟当前位置
        self.bird_male_position = [0, 0]
        # 雌鸟初始位置
        self.bird_female_init_position = [1080, 0]



    # 雄鸟碰撞检测子函数
    def collide(self, state_position):
        # 用标志flag,flag1,flag2分别表示是否与障碍物、障碍物墙1、障碍物墙2发生碰撞
        flag = 1
        flag1 = 1
        flag2 = 1
        # 检测雄鸟是否与第一个障碍物墙发生碰撞
        # 找到雄鸟与第一个障碍物所有障碍物x方向和y方向最近的障碍物的坐标差
        # 并判断最近的坐标差是否大于一个最小运动距离
        # 如果大于等于 就不会发生碰撞
        dx = []
        dy = []
        for i in range(8):
            dx1 = abs(self.obstacle1_x[i] - state_position[0])
            dx.append(dx1)
            dy1 = abs(self.obstacle1_y[i] - state_position[1])
            dy.append(dy1)
        mindx = min(dx)
        mindy = min(dy)
        if mindx >= self.limit_distance_x or mindy >= self.limit_distance_y:
            flag1 = 0  # 没碰
        # 是否与第二个障碍物墙碰撞
        second_dx = []
        second_dy = []
        for i in range(8):
            dx2 = abs(self.obstacle2_x[i] - state_position[0])
            second_dx.append(dx2)
            dy2 = abs(self.obstacle2_y[i] - state_position[1])
            second_dy.append(dy2)
        mindx = min(second_dx)
        mindy = min(second_dy)
        if mindx >= self.limit_distance_x or mindy >= self.limit_distance_y:
            flag2 = 0  # 没碰
        if flag1 == 0 and flag2 == 0:
            flag = 0  # 没碰
        # 是否超出边界,如果是,也认为发生碰撞
        if state_position[0] > 1080 or state_position[0] < 0 or state_position[1] > 810 or state_position[1] < 0:
            flag = 1  # 碰了
        # 返回碰撞标志位
        return flag

        # 雄鸟是否找到雌鸟子函数

    def find(self, state_position):
        # 设置标志位flag
        # 判断雄鸟当前位置和雌鸟位置坐标差,雄安与最小运动距离则为找到
        flag = 0
        if abs(state_position[0] - self.bird_female_init_position[0]) < self.limit_distance_x and abs(
                state_position[1] - self.bird_female_init_position[1]) < self.limit_distance_y:
            flag = 1
        return flag

        # 状态转化为像素坐标子函数

    def state_to_position(self, state):
        i = int(state / 10)
        j = state % 10
        position = [0, 0]
        position[0] = 120 * j
        position[1] = 90 * i
        return position

        # 像素转化为状态坐标子函数

    def position_to_state(self, position):
        i = position[0] / 120
        j = position[1] / 90
        return int(i + 10 * j)


    def reset(self):
        #随机产生初始状态
        flag1=1
        flag2=1
        while flag1 or flag2 ==1:
            #随机产生初始状态,0~99,randoom.random() 产生一个0~1的随机数
            state=self.states[int(random.random()*len(self.states))]
            state_position = self.state_to_position(state)
            flag1 = self.collide(state_position)
            flag2 = self.find(state_position)
        return state

    # 原来的回报只有在找到目标点和碰到障碍物的时候才有回报,是稀疏回报
    # 蒙特卡洛方法对于稀疏回报问题估计方差无穷大
    # 为此,我们每一步都给出了回报,将稀疏回报变成稠密汇报
    def transform(self,state, action):
        # 将当前状态转化为坐标
        current_position=self.state_to_position(state)
        next_position = [0,0]
        flag_collide=0
        flag_fnd=0
        # 判断当i前坐标是否与障碍物碰撞
        flag_collide=self.collide(current_position)
        # 判断状态是否是终点
        flag_find=self.find(current_position)
        if flag_collide == 1:
            return state, -10, True
        if flag_find == 1:
            return state, 10, True

        # 状态转移
        if action=='e':
            next_position[0]=current_position[0]+120
            next_position[1]=current_position[1]
        if action=='s':
            next_position[0]=current_position[0]
            next_position[1]=current_position[1]+90
        if action=='w':
            next_position[0] = current_position[0] - 120
            next_position[1] = current_position[1]
        if action=='n':
            next_position[0] = current_position[0]
            next_position[1] = current_position[1] - 90
        # 判断next_state是否与障碍物碰撞
        flag_collide = self.collide(next_position)

        # 如果碰撞,那么回报为-10,并结束
        if flag_collide==1:
            return self.position_to_state(current_position),-10,True

        # 判断是否终点
        flag_find = self.find(next_position)
        if flag_find==1:
            return self.position_to_state(next_position),15,True
        # 每走一步回报-2
        return self.position_to_state(next_position),-2.5, False
    def gameover(self):
        for event in pygame.event.get():
            if event.type == QUIT:
                exit()
    def render(self):
        if self.viewer is None:
            pygame.init()
            #画一个窗口
            self.viewer=pygame.display.set_mode(self.screen_size,0,32)
            pygame.display.set_caption("yuanyang")
            #下载图片
            self.bird_male = load_bird_male()
            self.bird_female = load_bird_female()
            self.background = load_background()
            self.obstacle = load_obstacle()
            #self.viewer.blit(self.bird_male, self.bird_male_init_position)
            #在幕布上画图片
            self.viewer.blit(self.bird_female, self.bird_female_init_position)
            self.viewer.blit(self.background, (0, 0))
            self.font = pygame.font.SysFont('times', 15)
        self.viewer.blit(self.background,(0,0))
        #画直线
        for i in range(11):
            pygame.draw.lines(self.viewer, (255, 255, 255), True, ((120*i, 0), (120*i, 900)), 1)
            pygame.draw.lines(self.viewer, (255, 255, 255), True, ((0, 90* i), (1200, 90 * i)), 1)
        self.viewer.blit(self.bird_female, self.bird_female_init_position)
        #画障碍物
        for i in range(8):
            self.viewer.blit(self.obstacle, (self.obstacle1_x[i], self.obstacle1_y[i]))
            self.viewer.blit(self.obstacle, (self.obstacle2_x[i], self.obstacle2_y[i]))
        #画小鸟
        self.viewer.blit(self.bird_male,  self.bird_male_position)

        # 画动作-值函数
        for i in range(100):
            y = int(i/10)
            x = i % 10
            # 往东的值函数
            surface = self.font.render(str(round(float(self.action_value[i,0]),2)),True,(0,0,0))
            self.viewer.blit(surface,(120*x+80,90*y+45))
            # 往南的值函数
            surface = self.font.render(str(round(float(self.action_value[i,1]),2)),True,(0,0,0))
            self.viewer.blit(surface, (120 * x + 50, 90 * y + 70))
            # 往西的值函数
            surface = self.font.render(str(round(float(self.action_value[i, 2]), 2)), True, (0, 0, 0))
            self.viewer.blit(surface, (120 * x + 10, 90 * y + 45))
            # 往北的值函数
            surface = self.font.render(str(round(float(self.action_value[i, 3]), 2)), True, (0, 0, 0))
            self.viewer.blit(surface, (120 * x + 50, 90 * y + 10))

         # 画路径点
        for i in range(len(self.path)):
            rec_position = self.state_to_position(self.path[i])
            pygame.draw.rect(self.viewer, [255, 0, 0], [rec_position[0], rec_position[1], 120, 90], 3)
            surface = self.font.render(str(i), True, (255, 0, 0))
            self.viewer.blit(surface, (rec_position[0] + 5, rec_position[1] + 5))
        pygame.display.update()
        self.gameover()
        # time.sleep(0.1)
        self.FPSCLOCK.tick(30)



if __name__=="__main__":
    yy=YuanYangEnv()
    yy.render()
    while True:
        for event in pygame.event.get():
            if event.type == QUIT:
                exit()

MC_RL.py

python">import pygame
import time
import random
import numpy as np
import matplotlib.pyplot as plt
from yuanyang_env_mc import YuanYangEnv
class MC_RL:
    def __init__(self, yuanyang):
        # 声明行为值函数 100*4的表格
        self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions))) * 0.1
        # n用来表示状态行为对被访问的次数 次数初始化
        # n[s,a]=1,2,3??求经验平均时 q(s,a) = G(s,a)/n(s,a)
        self.n = 0.001 * np.ones((len(yuanyang.states), len(yuanyang.actions)))
        self.actions = yuanyang.actions
        self.yuanyang = yuanyang
        self.gamma = yuanyang.gamma

        # 定义贪婪策略greedy-policy

    def greedy_policy(self, qfun, state):
        amax = qfun[state, :].argmax()
        return self.actions[amax]

        # 定义epsilon-greedy policy

    def epsilon_greedy_policy(self, qfun, state, epsilon):
        amax = qfun[state, :].argmax()
        # 概率部分
        if np.random.uniform() < 1 - epsilon:
            # 最优动作
            return self.actions[amax]
        else:
            return self.actions[int(random.random() * len(self.actions))]

        # 找到对应动作对应的序号

    def find_anum(self, a):
        for i in range(len(self.actions)):
            if a == self.actions[i]:
                return i

        # 同策略蒙特卡洛算法 on-policy

    def mc_learning_on_policy(self, num_iter, epsilon):
        self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions)))
        self.n = 0.001 * np.ones((len(yuanyang.states), len((yuanyang.actions))))
        # 学习 num_iter次
        for iter1 in range(num_iter):
            # 采集状态样本
            s_sample = []
            # 采集动作样本
            a_sample = []
            # 采集回报样本
            r_sample = []
            # 固定初始状态
            s = 0
            done = False
            step_num = 0
            epsilon = epsilon*np.exp(-iter1 / 1000)

            # 设置好基本遍历,进入与环境交互程序,根据当前策略不断采集数据。
            # 采集数据过程中为利用当前策略得到动作a,智能体在状态s处采用a与环境交互,从环境中得到下一个状态s_next,回报r,以及是否结束的标志位done
            while False == done and step_num < 30:
                a = self.epsilon_greedy_policy(self.qvalue, s, epsilon)
                # 与环境交互
                s_next, r, done = self.yuanyang.transform(s, a)
                a_num = self.find_anum(a)

                # 往回走给惩罚
                if s_next in s_sample:
                    r = -2
                # 存储数据,采用数据
                s_sample.append(s)
                r_sample.append(r)
                a_sample.append(a_num)
                step_num += 1
                # 转移到下一个状态,继续实验
                s = s_next

            # 检测结果,判断是否完成
            # 任务完成结束条件
            if s == 9:
                print("同策略第一次完成任务需要的次数:", iter1)
                break
            # 根据回报值计算折扣累积回报
            # 先得到下一个状态处的值函数,然后逆向求解前面状态-值行为对的折扣累积回报
            # 最后得到该次实验中第一个状态行为对的折扣累积回报
            # 从样本中计算折扣回报 g(s_0) = r_0 +gamma*r_1+...+v(sT)
            a = self.epsilon_greedy_policy(self.qvalue, s, epsilon)
            g = self.qvalue[s, self.find_anum(a)]
            # 计算该序列的第一状态折扣累积回报
            for i in range(len(s_sample) - 1, -1, -1):
                g *= self.gamma
                g += r_sample[i]

            # 下面从实验第1个状态开始,依次计算后继状态的折扣累积回报,并利用增量式的方法更新每个状态-行为对的值
            # g = G(s1,a)。开始算其他处的累积回报
            for i in range(len(s_sample)):
                # 计算状态行为对(s,a)的次数,s,a1,....s,a2
                self.n[s_sample[i], a_sample[i]] += 1.0
                # 利用增量式方法更新值函数
                self.qvalue[s_sample[i], a_sample[i]] = (self.qvalue[s_sample[i], a_sample[i]] * (
                        self.n[s_sample[i], a_sample[i]] - 1) + g) / (self.n[s_sample[i], a_sample[i]])
                g -= r_sample[i]
                g /= self.gamma
        return self.qvalue
        # 探索初始化蒙特卡洛实现

    def mc_learning_ei(self, num_iter):
        self.qvalue = np.zeros((len(yuanyang.states), len(yuanyang.actions)))
        self.n = 0.001 * np.ones((len(yuanyang.states), len(yuanyang.actions)))
        # 学习num_iter次
        for iter1 in range(num_iter):
            # 采集状态样本
            s_sample = []
            # 采集动作样本
            a_sample = []
            # 采集回报样本
            r_sample = []
            # 随机初始化状态
            s = self.yuanyang.reset()
            a = self.actions[int(random.random() * len(self.actions))]
            done = False
            step_num = 0

            # 调用mc_test函数,该函数用来测试经过学习后策略是否找到了目标;如果找到,返回1,否则返回0
            # 找到目标时,把蒙特卡洛迭代学习的次数打印出来
            if self.mc_test() == 1:
                print("探索初始化第1次完成任务需要的次数:", iter1)
                break
            # 采集数据 s0-a1-s1-a1-s2....terminate state

            # 下面用当前贪婪策略进行一次实验,并保存相应的数据到s_sample和a_sample,r_sample
            while False == done and step_num < 30:
                # 与环境交互
                s_next, r, done = self.yuanyang.transform(s, a)
                a_num = self.find_anum(a)

                # 很关键! 在实际训练过程中,智能体可能会往回走,但是这是不合理的,因此给再次访问已经访问过状态的动作负的回报
                # 往回走给予惩罚
                if s_next in s_sample:
                    r = -2
                # 存储数据,采样数据
                s_sample.append(s)
                r_sample.append(r)
                a_sample.append(a_num)
                step_num += 1
                # 转移到下一个状态,继续实验,s0-s1-s2
                s = s_next
                a = self.greedy_policy(self.qvalue, s)

                # 根据回报值计算折扣累积回报
                # 先得到下一个状态处的值函数,然后逆向求解前面状态-值行为对的折扣累积回报
                # 最后得到该次实验中第一个状态行为对的折扣累积回报
                # 从样本中计算折扣回报 g(s_0) = r_0 +gamma*r_1+...+v(sT)
                a = self.greedy_policy(self.qvalue, s)
                g = self.qvalue[s, self.find_anum(a)]
                # 计算该序列的第一状态折扣累积回报
                for i in range(len(s_sample) - 1, -1, -1):
                    g *= self.gamma
                    g += r_sample[i]

                # 下面从实验第1个状态开始,依次计算后继状态的折扣累积回报,并利用增量式的方法更新每个状态-行为对的值
                # g = G(s1,a)。开始算其他处的累积回报
                for i in range(len(s_sample)):
                    # 计算状态行为对(s,a)的次数,s,a1,....s,a2
                    self.n[s_sample[i], a_sample[i]] += 1.0
                    # 利用增量式方法更新值函数
                    self.qvalue[s_sample[i], a_sample[i]] = (self.qvalue[s_sample[i], a_sample[i]] * (
                                self.n[s_sample[i], a_sample[i]] - 1) + g) / (self.n[s_sample[i], a_sample[i]])
                    g -= r_sample[i]
                    g /= self.gamma
            return self.qvalue

    # 测试子函数mc_test(),初始状态为0,与环境交互,
    def mc_test(self):
        s = 0
        s_sample = []
        done = False
        flag = 0
        step_num = 0
        while False == done and step_num < 30:
            a = self.greedy_policy(self.qvalue, s)
            # 与环境交互
            s_next, r, done = self.yuanyang.transform(s, a)
            s_sample.append(s)
            s = s_next
            step_num += 1
        if s == 9:
            flag = 1
        return flag

# 主函数
# 首先利用环境类YuanYangEnv()实例化一个智能体;
# 然后,用蒙特卡洛算法实例化一个brain,调用探索初始化蒙特卡洛算法mc_learning_ei
# 将学到的行为值函数给环境,将其渲染出来
# 最后,对学习好的策略进行测试,并显示出路径
if __name__=="__main__":
    yuanyang = YuanYangEnv()
    brain = MC_RL(yuanyang)
    # 探索初始化方法
    qvalue1 = brain.mc_learning_ei(num_iter=10000)
    # 同策略方法
    qvalue2 = brain.mc_learning_on_policy(num_iter=10000,epsilon=0.2)
    print(qvalue2)
    # 将行为值函数渲染出来
    yuanyang.action_value = qvalue2
    # 测试学习到的策略
    flag = 1
    s = 0
    path = []
    step_num = 0
    #print(policy_value.pi)
    # 打印最优路径
    while flag:
        # 渲染路径点
        path.append(s)
        yuanyang.path = path
        a = brain.greedy_policy(qvalue2,s)
        print('%d->%s\t'%(s,a),qvalue2[s,0],qvalue2[s,1],qvalue2[s,2],qvalue2[s,3])
        yuanyang.bird_male_position = yuanyang.state_to_position(s)
        yuanyang.render()
        time.sleep(0.25)
        step_num+=1
        s_,r,t = yuanyang.transform(s,a)
        if t == True or step_num >30:
            flag = 0
        s = s_
    # 渲染最后的路径点
    yuanyang.bird_male_position = yuanyang.state_to_position(s)
    path.append(s)
    yuanyang.render()
    while True:
        yuanyang.render()

结果
在这里插入图片描述

3.2 部分代码思考

  1. MC_RL.py——line 10
python">self.qvalue = np.zeros((len(yuanyang.states),len(yuanyang.actions)))*0.1
  • *0.1是什么?
  • 我的猜想:零矩阵×0.1数值不变,精度改变成保留一位小数。???

http://www.niftyadmin.cn/n/1399169.html

相关文章

【深入浅出强化学习-编程实战】4 基于时间差分的方法-鸳鸯系统

【深入浅出强化学习-编程实战】4 基于时间差分的方法4.1 鸳鸯系统——基于时间差分的方法4.2 Sarsa结果4.3 Q-learning结果4.1 鸳鸯系统——基于时间差分的方法 左上为雄鸟&#xff0c;右上为雌鸟&#xff0c;中间有两道障碍物。目标&#xff1a;雄鸟找到雌鸟。 yuanyang_env_…

【深入浅出强化学习-编程实战】6 基于函数逼近的方法-鸳鸯系统

【深入浅出强化学习-编程实战】6 基于函数逼近的方法-鸳鸯系统6.1 鸳鸯系统——基于函数逼近的方法基于表格表征表示基于固定稀疏表示代码困惑6.1 鸳鸯系统——基于函数逼近的方法 左上为雄鸟&#xff0c;右上为雌鸟&#xff0c;中间有两道障碍物。目标&#xff1a;雄鸟找到雌…

【深入浅出强化学习-编程实战】6 基于函数逼近的方法-flappy bird

【深入浅出强化学习-编程实战】6 基于函数逼近的方法-flappy bird6.2.1 代码6.2.2 代码解析玩家通过控制小鸟上下运动躲避不断到来的柱子&#xff0c;有两个动作可选&#xff1a;一个是飞&#xff0c;一个是不进行任何操作。采用动作飞时&#xff0c;小鸟会向上飞&#xff1b;不…

【深入浅出强化学习-编程实战】 7 基于策略梯度的强化学习-Cartpole(小车倒立摆系统)

【深入浅出强化学习-编程实战】 7 基于策略梯度的强化学习-Cartpole小车倒立摆MDP模型代码代码解析小车倒立摆MDP模型 状态输入&#xff1a;s[x,x˙,θ,θ˙]s [x,\dot{x},\theta,\dot{\theta}]s[x,x˙,θ,θ˙]&#xff0c;维数为4动作输出为a{[1,0],[0,1]}a \{[1,0],[0,1]…

【深入浅出强化学习-编程实战】 7 基于策略梯度的强化学习-Pendulum(单摆系统)

【深入浅出强化学习-编程实战】 7 基于策略梯度的强化学习-PendulumPendulum单摆系统MDP模型代码代码学习Pendulum单摆系统MDP模型 该系统只包含一个摆杆&#xff0c;其中摆杆可以绕着一端的轴线摆动&#xff0c;在轴线施加力矩τ\tauτ来控制摆杆摆动。Pendulum目标是&#x…

【深入浅出强化学习-编程实战】 8 Actor-Critic-Pendulum(单摆系统)

【深入浅出强化学习-编程实战】 8 Actor-Critic-PendulumPendulum单摆系统MDP模型TD-AC算法TD-AC算法代码解析Minibatch-MC-AC算法Minibatch-MC-AC算法代码解析Pendulum单摆系统MDP模型 该系统只包含一个摆杆&#xff0c;其中摆杆可以绕着一端的轴线摆动&#xff0c;在轴线施加…

【深入浅出强化学习-编程实战】 8 DDPG(单摆系统)

【深入浅出强化学习-编程实战】 10 DDPGPendulum单摆系统MDP模型代码训练效果代码学习Pendulum单摆系统MDP模型 该系统只包含一个摆杆&#xff0c;其中摆杆可以绕着一端的轴线摆动&#xff0c;在轴线施加力矩τ\tauτ来控制摆杆摆动。Pendulum目标是&#xff1a;从任意状态出发…

Actor-critic

Actor-criticReview-Policy GradientReview-Q-learningActor-CriticAdvantage Actor-Critic(A2C)Asynchronous Advantage Actor-Critic(A3C)Pathwise Derivative Policy Gradient李宏毅深度强化学习 学习笔记 学习资料&#xff1a;http://speech.ee.ntu.edu.tw/~tlkagk/courses…