优先经验回放(prioritized experience replay)

news/2024/5/18 22:16:56 标签: 强化学习, 经验回放

prioritized experience replay 思路

优先经验回放出自ICLR 2016的论文《prioritized experience replay》。

prioritized experience replay的作者们认为,按照一定的优先级来对经验回放池中的样本采样,相比于随机均匀的从经验回放池中采样的效率更高,可以让模型更快的收敛。其基本思想是RL agent在一些转移样本上可以更有效的学习,也可以解释成“更多地训练会让你意外的数据”。

那优先级如何定义呢?作者们使用的是样本的TD error δ \delta δ 的幅值。对于新生成的样本,TD error未知时,将样本赋值为最大优先级,以保证样本至少将会被采样一次。每个采样样本的概率被定义为
P ( i ) = p i α ∑ k p k α P(i) = \frac {p_i^{\alpha}} {\sum_k p_k^{\alpha}} P(i)=kpkαpiα
上式中的 p i > 0 p_i >0 pi>0是回放池中的第i个样本的优先级, α \alpha α则强调有多重视该优先级,如果 α = 0 \alpha=0 α=0,采样就退化成和基础DQN一样的均匀采样了。

p i p_i pi如何取值,论文中提供了如下两种方法,两种方法都是关于TD error δ \delta δ 单调的:

  • 基于比例的优先级: p i = ∣ δ i ∣ + ϵ p_i = |\delta_i| + \epsilon pi=δi+ϵ ϵ \epsilon ϵ是一个很小的正数常量,防止当TD error为0时样本就不会被访问到的情形。(目前大部分实现都是使用的这个形式的优先级)
  • 基于排序的优先级: p i = 1 r a n k ( i ) p_i = \frac {1}{rank(i)} pi=rank(i)1, 式中的 r a n k ( i ) rank(i) rank(i)是样本根据 ∣ δ i ∣ |\delta_i| δi经验回放池中的排序号,此时P就变成了带有指数 α \alpha α的幂率分布了。

作者们定义的概率调整了样本的优先级,因此也就在数据分布中引入了偏差,为了弥补偏差,使用了重要性采样权重(importance-sampling (IS) weights):
w i = ( 1 N ⋅ 1 P ( i ) ) β w_i = \left( \frac{1}{N} \cdot \frac{1}{P(i)} \right)^{\beta} wi=(N1P(i)1)β
上式权重中,当 β = 1 \beta=1 β=1时就完全补偿了非均匀概率采样引入的偏差,作者们提到为了收敛性考虑,最后让 β \beta β从0到1中的某个值开始,并逐渐增加到1。在Q-learning更新时使用这些权重乘以TD error,也就是使用 w i δ i w_i \delta_i wiδi而不是原来的 δ i \delta_i δi。此外,为了使训练更稳定,总是对权重乘以 1 / m a x i w i 1/\mathcal{max}_i{w_i} 1/maxiwi进行归一化。

以Double DQN为例,使用优先经验回放的算法(论文算法1)如下图:

在这里插入图片描述

prioritized experience replay 实现

直接实现优先经验回放池如下代码(修改自代码 )

class PrioReplayBufferNaive:
    def __init__(self, buf_size, prob_alpha=0.6, epsilon=1e-5, beta=0.4, beta_increment_per_sampling=0.001):
        self.prob_alpha = prob_alpha
        self.capacity = buf_size
        self.pos = 0
        self.buffer = []
        self.priorities = np.zeros((buf_size, ), dtype=np.float32)
        self.beta = beta
        self.beta_increment_per_sampling = beta_increment_per_sampling
        self.epsilon = epsilon

    def __len__(self):
        return len(self.buffer)

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

    def add(self, sample):
        # 新加入的数据使用最大的优先级,保证数据尽可能的被采样到
        max_prio = self.priorities.max() if self.buffer else 1.0
        if len(self.buffer) < self.capacity:
            self.buffer.append(sample)
        else:
            self.buffer[self.pos] = sample
        self.priorities[self.pos] = max_prio
        self.pos = (self.pos + 1) % self.capacity

    def sample(self, batch_size):
        if len(self.buffer) == self.capacity:
            prios = self.priorities
        else:
            prios = self.priorities[:self.pos]
        probs = np.array(prios, dtype=np.float32) ** self.prob_alpha

        probs /= probs.sum()
        indices = np.random.choice(len(self.buffer), batch_size, p=probs, replace=True)
        samples = [self.buffer[idx] for idx in indices]
        total = len(self.buffer)
        self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])
        weights = (total * probs[indices]) ** (-self.beta)
        weights /= weights.max()
        return samples, indices, np.array(weights, dtype=np.float32)

    def update_priorities(self, batch_indices, batch_priorities):
        '''
        更新样本的优先级'''
        for idx, prio in zip(batch_indices, batch_priorities):
            self.priorities[idx] = prio + self.epsilon

直接实现的优先经验回放,在样本数很大时的采样效率不够高,作者们通过定义了sumtree的数据结构来存储样本优先级,该数据结构的每一个节点的值为其子节点之和,而样本优先级被放在树的叶子节点上,树的根节点的值为所有优先级之和 p t o t a l p_{total} ptotal,更新和采样时的效率为 O ( l o g N ) O(logN) O(logN)。在采样时,设采样批次大小为k,将 [ 0 , p t o t a l ] [0, p_{total}] [0,ptotal]均分为k等份,然后在每一个区间均匀的采样一个值,再通过该值从树中提取到对应的样本。python 实现如下(代码来源)

class SumTree:
    """
    父节点的值是其子节点值之和的二叉树数据结构
    """
    write = 0

    def __init__(self, capacity):
        self.capacity = capacity
        self.tree = np.zeros(2 * capacity - 1)
        self.data = np.zeros(capacity, dtype=object)
        self.n_entries = 0

    # update to the root node
    def _propagate(self, idx, change):
        parent = (idx - 1) // 2

        self.tree[parent] += change

        if parent != 0:
            self._propagate(parent, change)

    # find sample on leaf node
    def _retrieve(self, idx, s):
        left = 2 * idx + 1
        right = left + 1

        if left >= len(self.tree):
            return idx

        if s <= self.tree[left]:
            return self._retrieve(left, s)
        else:
            return self._retrieve(right, s - self.tree[left])

    def total(self):
        return self.tree[0]

    # store priority and sample
    def add(self, p, data):
        idx = self.write + self.capacity - 1

        self.data[self.write] = data
        self.update(idx, p)

        self.write += 1
        if self.write >= self.capacity:
            self.write = 0

        if self.n_entries < self.capacity:
            self.n_entries += 1

    # update priority
    def update(self, idx, p):
        change = p - self.tree[idx]

        self.tree[idx] = p
        self._propagate(idx, change)

    # get priority and sample
    def get(self, s):
        idx = self._retrieve(0, s)
        dataIdx = idx - self.capacity + 1

        return (idx, self.tree[idx], self.data[dataIdx])


class PrioReplayBuffer:  # stored as ( s, a, r, s_ ) in SumTree
    epsilon = 0.01
    alpha = 0.6
    beta = 0.4
    beta_increment_per_sampling = 0.001

    def __init__(self, capacity):
        self.tree = SumTree(capacity)
        self.capacity = capacity

    def _get_priority(self, error):
        return (np.abs(error) + self.epsilon) ** self.alpha

    def add(self, error, sample):
        p = self._get_priority(error)
        self.tree.add(p, sample)

    def sample(self, n):
        batch = []
        idxs = []
        segment = self.tree.total() / n
        priorities = []

        self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])

        for i in range(n):
            a = segment * i
            b = segment * (i + 1)

            s = random.uniform(a, b)
            (idx, p, data) = self.tree.get(s)
            priorities.append(p)
            batch.append(data)
            idxs.append(idx)

        sampling_probabilities = priorities / self.tree.total()
        is_weight = np.power(self.tree.n_entries * sampling_probabilities, -self.beta)
        is_weight /= is_weight.max()

        return batch, idxs, is_weight

    def update(self, idx, error):
      '''
      这里是一次更新一个样本,所以在调用时,写for循环依次更次样本的优先级
      '''
        p = self._get_priority(error)
        self.tree.update(idx, p)

参考资料

  1. Schaul, Tom, John Quan, Ioannis Antonoglou, and David Silver. 2015. “Prioritized Experience Replay.” arXiv: Learning,arXiv: Learning, November.

  2. sum_tree的实现代码

  3. 相关blog: 1 (对应的代码), 2, 3


http://www.niftyadmin.cn/n/5204406.html

相关文章

编译QT Mysql库并集成使用

安装MSVC编译器与Windows 10 SDK 打开Visual Studio Installer&#xff0c;如果已经安装过内容了可能是如下页面&#xff0c;点击修改&#xff08;头一回打开的话不需要这一步&#xff09;&#xff1a; 然后在工作负荷中勾选使用C的桌面开发&#xff0c;它会帮我们勾选好一些…

电大搜题——打开学习之门的最佳选择

在快节奏的现代社会&#xff0c;追求知识和学习成为愈发重要的需求。然而&#xff0c;许多人由于时间和机会的限制&#xff0c;无法实现自己的教育梦想。就在这个时候&#xff0c;安徽开放大学广播电视大学通过推出电大搜题微信公众号&#xff0c;为广大学子提供了一个便捷高效…

PTA目录树

在ZIP归档文件中&#xff0c;保留着所有压缩文件和目录的相对路径和名称。当使用WinZIP等GUI软件打开ZIP归档文件时&#xff0c;可以从这些信息中重建目录的树状结构。请编写程序实现目录的树状结构的重建工作。 输入格式: 输入首先给出正整数N&#xff08;≤104&#xff09;…

《LeetCode力扣练习》代码随想录——链表(环形链表II---Java)

《LeetCode力扣练习》代码随想录——链表&#xff08;环形链表II—Java&#xff09; 刷题思路来源于 代码随想录 142. 环形链表 II 双指针 /*** Definition for singly-linked list.* class ListNode {* int val;* ListNode next;* ListNode(int x) {* val…

解决requests库进行爬虫ip请求时遇到的错误的方法

目录 一、超时错误 二、连接错误 三、拒绝服务错误 四、内容编码错误 五、HTTP错误 在利用requests库进行网络爬虫的IP请求时&#xff0c;我们可能会遇到各种错误&#xff0c;如超时、连接错误、拒绝服务等等。这些错误通常是由目标网站的限制、网络问题或我们的爬虫代码中…

防爆智能安全帽、防爆手持终端,防爆智能矿灯守护安全,在煤矿安全生产远程可视化监管中的应用

煤矿安全新守护&#xff1a;如何通过防爆智能装备实现远程可视化监管 煤矿是国民经济的重要支柱产业&#xff0c;但长期以来&#xff0c;安全生产事故的频发一直是困扰煤矿行业发展的严峻问题。安全生产事故不仅危及矿工的生命安全&#xff0c;也对企业和地方经济造成了重大的…

排序算法-----快速排序(非递归实现)

目录 前言 快速排序 基本思路 非递归代码实现 前言 很久没跟新数据结构与算法这一栏了&#xff0c;因为数据结构与算法基本上都发布完了&#xff0c;哈哈&#xff0c;那今天我就把前面排序算法那一块的快速排序完善一下&#xff0c;前面只发布了快速排序递归算法&#xff0c;…

[Linux] Network: IPv6 link-local 地址是否可用不自动生成

原来有一段时间在做扩充产品的VLAN个数&#xff0c;然后就遇到过一个问题&#xff1a;说这个Linux的默认配置里&#xff0c;会为每一个网络接口添加一个link-local的地址&#xff0c;就是FE80::开头的地址&#xff0c;在RFC-4291里有如下的定义&#xff1a; Link-Local unicas…