【rl-agents代码学习】02——DQN算法

news/2024/5/18 21:47:35 标签: 学习, 算法, 强化学习, 机器学习, 人工智能

文章目录

  • Highway-env Intersection
  • rl-agents之DQN
    • *Implemented variants*:
    • *References*:
    • Query agent for actions sequence
      • 探索策略
      • 神经网络实现
      • 小结1
    • Record the experience
      • Replaybuffer
      • compute_bellman_residual
      • step_optimizer
      • update_target_network
      • 小结2
    • exploration_policy
    • 运行结果

Highway-env Intersection

本文将继续探索rl-agents中相关DQN算法的实现。下面的介绍将会以intersection这个环境为例,首先介绍一下Highway-env中的intersection-v1。Highway-env中相关文档——http://highway-env.farama.org/environments/intersection/。

highway-env中的环境可以通过配置文件进行修改, observations, actions, dynamics 以及rewards等信息都是以字典的形式存储在配置文件中。

PS:DQN、DuelingDQN算法原理可参考【强化学习】10 —— DQN算法【强化学习】11 —— Double DQN算法与Dueling DQN算法

import gymnasium as gym
import pprint
from matplotlib import pyplot as plt

env = gym.make("intersection-v1", render_mode='rgb_array')
pprint.pprint(env.unwrapped.config)

输出config,可以看到如下信息:

{'action': {'dynamical': True,
            'lateral': True,
            'longitudinal': True,
            'steering_range': [-1.0471975511965976, 1.0471975511965976],
            'type': 'ContinuousAction'},
 'arrived_reward': 1,
 'centering_position': [0.5, 0.6],
 'collision_reward': -5,
 'controlled_vehicles': 1,
 'destination': 'o1',
 'duration': 13,
 'high_speed_reward': 1,
 'initial_vehicle_count': 10,
 'manual_control': False,
 'normalize_reward': False,
 'observation': {'features': ['presence',
                              'x',
                              'y',
                              'vx',
                              'vy',
                              'long_off',
                              'lat_off',
                              'ang_off'],
                 'type': 'Kinematics',
                 'vehicles_count': 5},
 'offroad_terminal': False,
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_speed_range': [7.0, 9.0],
 'scaling': 7.15,
 'screen_height': 600,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15,
 'spawn_probability': 0.6}

之后可以通过以下代码输出图像:

plt.imshow(env.render())
plt.show()

在这里插入图片描述
输出observation,可以看到是一个5*8的array,:

[[ 1.0000000e+00  9.9999998e-03  1.0000000e+00  0.0000000e+00
  -1.2500000e-01  6.3297665e+01  0.0000000e+00  0.0000000e+00]
 [ 1.0000000e+00  1.3849856e-01 -1.0000000e+00 -9.9416278e-02
   1.2500000e-01  8.1300293e+01  1.0361128e-15  0.0000000e+00]
 [ 1.0000000e+00 -2.0000000e-02 -1.0000000e+00  0.0000000e+00
   2.2993930e-01  6.5756187e+01  2.8473811e-15  0.0000000e+00]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00
   0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00
   0.0000000e+00  0.0000000e+00  0.0000000e+00  0.0000000e+00]]

observation的解释如下,
在这里插入图片描述
通过以下代码,可以将action的类型变为离散的空间。

env.unwrapped.configure({
    "action": {
        'longitudinal': True,
        "type": "DiscreteMetaAction"
    }
})

rl-agents之DQN

A neural-network model is used to estimate the state-action value function and produce a greedy optimal policy.

Implemented variants:

  • Double DQN
  • Dueling architecture
  • N-step targets

References:

Playing Atari with Deep Reinforcement Learning, Mnih V. et al (2013).
Deep Reinforcement Learning with Double Q-learning, van Hasselt H. et al. (2015).
Dueling Network Architectures for Deep Reinforcement Learning, Wang Z. et al. (2015).

Query agent for actions sequence

由上一节所知,通过调用run_episodes函数,进行具体的agent训练。其中会调用step函数,并执行self.agent.plan(self.observation)。对于DQNAgent的实现,首先由AbstractAgent类实现plan,之后plan函数会调用act函数:

    def step(self):
        """
            Plan a sequence of actions according to the agent policy, and step the environment accordingly.
        """
        # Query agent for actions sequence
        actions = self.agent.plan(self.observation)
// rl_agents/agents/common/abstract.py
class AbstractAgent(Configurable, ABC):

    def __init__(self, config=None):
        super(AbstractAgent, self).__init__(config)
        self.writer = None  # Tensorboard writer
        self.directory = None  # Run directory
        
    @abstractmethod
    def act(self, state):
        """
            Pick an action

        :param state: s, the current state of the agent
        :return: a, the action to perform
        """
        raise NotImplementedError()

    def plan(self, state):
        """
            Plan an optimal trajectory from an initial state.

        :param state: s, the initial state of the agent
        :return: [a0, a1, a2...], a sequence of actions to perform
        """
        return [self.act(state)]

DQN抽象类AbstractDQNAgent继承自AbstractStochasticAgentAbstractStochasticAgent继承自AbstractAgent,在DQN抽象类AbstractDQNAgent中实现对act函数的重写:

    def act(self, state, step_exploration_time=True):
        """
            Act according to the state-action value model and an exploration policy
        :param state: current state
        :param step_exploration_time: step the exploration schedule
        :return: an action
        """
        self.previous_state = state
        if step_exploration_time:
            self.exploration_policy.step_time()
        # Handle multi-agent observations
        # TODO: it would be more efficient to forward a batch of states
        if isinstance(state, tuple):
            return tuple(self.act(agent_state, step_exploration_time=False) for agent_state in state)

        # Single-agent setting
        values = self.get_state_action_values(state)
        self.exploration_policy.update(values)
        return self.exploration_policy.sample()

探索策略

首先来看一下exploration_policy 的实现:

        self.exploration_policy = exploration_factory(self.config["exploration"], self.env.action_space)

探索策略加载的配置文件部分:

"exploration": {
 "method": "EpsilonGreedy",
    "tau": 15000,
    "temperature": 1.0,
    "final_temperature": 0.05
}

跳转到exploration_factory,可以看到主要实现了三类探索策略,具体的内容会在后面部分进行介绍:

  • Greedy
  • ϵ \epsilon ϵ-Greedy
  • Boltzmann
def exploration_factory(exploration_config, action_space):
    """
        Handles creation of exploration policies
    :param exploration_config: configuration dictionary of the policy, must contain a "method" key
    :param action_space: the environment action space
    :return: a new exploration policy
    """
    from rl_agents.agents.common.exploration.boltzmann import Boltzmann
    from rl_agents.agents.common.exploration.epsilon_greedy import EpsilonGreedy
    from rl_agents.agents.common.exploration.greedy import Greedy

    if exploration_config['method'] == 'Greedy':
        return Greedy(action_space, exploration_config)
    elif exploration_config['method'] == 'EpsilonGreedy':
        return EpsilonGreedy(action_space, exploration_config)
    elif exploration_config['method'] == 'Boltzmann':
        return Boltzmann(action_space, exploration_config)
    else:
        raise ValueError("Unknown exploration method")

神经网络实现

接着获取 Q ( s , a ) Q(s,a) Q(s,a)

    def get_state_action_values(self, state):
        """
        :param state: s, an environment state
        :return: [Q(a1,s), ..., Q(an,s)] the array of its action-values for each actions
        """
        return self.get_batch_state_action_values([state])[0]

调用了抽象方法get_batch_state_action_values

    @abstractmethod
    def get_batch_state_action_values(self, states):
        """
        Get the state-action values of several states
        :param states: [s1; ...; sN] an array of states
        :return: values:[[Q11, ..., Q1n]; ...] the array of all action values for each state
        """
        raise NotImplementedError

接着来看DQNAgent中的具体实现:

class DQNAgent(AbstractDQNAgent):
    def __init__(self, env, config=None):
        super(DQNAgent, self).__init__(env, config)
        size_model_config(self.env, self.config["model"])
        self.value_net = model_factory(self.config["model"])
        self.target_net = model_factory(self.config["model"])
        self.target_net.load_state_dict(self.value_net.state_dict())
        self.target_net.eval()
        logger.debug("Number of trainable parameters: {}".format(trainable_parameters(self.value_net)))
        self.device = choose_device(self.config["device"])
        self.value_net.to(self.device)
        self.target_net.to(self.device)
        self.loss_function = loss_function_factory(self.config["loss_function"])
        self.optimizer = optimizer_factory(self.config["optimizer"]["type"],
                                           self.value_net.parameters(),
                                           **self.config["optimizer"])
        self.steps = 0
        
    def get_batch_state_action_values(self, states):
        return self.value_net(torch.tensor(states, dtype=torch.float).to(self.device)).data.cpu().numpy()

value_net的实现依赖于model_factory,其中的配置文件部分如下:

    "model": {
        "type": "MultiLayerPerceptron",
        "layers": [128, 128]
    },

再进入model_factory,主要实现了四类网络:

  • MultiLayerPerceptron
  • DuelingNetwork
  • ConvolutionalNetwork
  • EgoAttentionNetwork

这里我们暂且先分析多层感知机MultiLayerPerceptron(即普通DQN)。

// rl_agents/agents/common/models.py
def model_factory(config: dict) -> nn.Module:
    if config["type"] == "MultiLayerPerceptron":
        return MultiLayerPerceptron(config)
    elif config["type"] == "DuelingNetwork":
        return DuelingNetwork(config)
    elif config["type"] == "ConvolutionalNetwork":
        return ConvolutionalNetwork(config)
    elif config["type"] == "EgoAttentionNetwork":
        return EgoAttentionNetwork(config)
    else:
        raise ValueError("Unknown model type")

MultiLayerPerceptron类继承自BaseModuleBaseModule继承自torch.nn.Module。根据配置文件baseline.json,可以看到MultiLayerPerceptron类的sizes为[128, 128],激活函数为RELU。我们可以注意到,网络实现中有reshape操作,因为state的输入是5*8的矩阵,通过reshape,可以将其转换为一维的向量。最终网络结构类似于下图。

在这里插入图片描述

class MultiLayerPerceptron(BaseModule, Configurable):
    def __init__(self, config):
        super().__init__()
        Configurable.__init__(self, config)
        sizes = [self.config["in"]] + self.config["layers"] 
        self.activation = activation_factory(self.config["activation"])
        layers_list = [nn.Linear(sizes[i], sizes[i + 1]) for i in range(len(sizes) - 1)]
        self.layers = nn.ModuleList(layers_list)
        if self.config.get("out", None):
            self.predict = nn.Linear(sizes[-1], self.config["out"])

    @classmethod
    def default_config(cls):
        return {"in": None,
                "layers": [64, 64],
                "activation": "RELU",
                "reshape": "True",
                "out": None}

    def forward(self, x):
        if self.config["reshape"]:
            x = x.reshape(x.shape[0], -1)  # We expect a batch of vectors
        for layer in self.layers:
            x = self.activation(layer(x))
        if self.config.get("out", None):
            x = self.predict(x)
        return x

获取 Q Q Q之后,探索策略进行更新,并sample一个action。以 ϵ \epsilon ϵ-Greedy为例,因为 ϵ \epsilon ϵ-Greedy继承DiscreteDistribution,所以主要关注DiscreteDistribution中的相关实现。

    def act(self, state, step_exploration_time=True):
    ...
        self.exploration_policy.update(values)
        return self.exploration_policy.sample()
rl_agents/agents/common/exploration/epsilon_greedy.py
    def update(self, values):
        """
            Update the action distribution parameters
        :param values: the state-action values
        :param step_time: whether to update epsilon schedule
        """
        self.optimal_action = np.argmax(values)
        self.epsilon = self.config['final_temperature'] + \
            (self.config['temperature'] - self.config['final_temperature']) * \
            np.exp(- self.time / self.config['tau'])
        if self.writer:
            self.writer.add_scalar('exploration/epsilon', self.epsilon, self.time)
class DiscreteDistribution(Configurable, ABC):
    def __init__(self, config=None, **kwargs):
        super(DiscreteDistribution, self).__init__(config)
        self.np_random = None
        
    @abstractmethod
    def get_distribution(self):
        """
        :return: a distribution over actions {action:probability}
        """
        raise NotImplementedError()

    def sample(self):
        """
        :return: an action sampled from the distribution
        """
        distribution = self.get_distribution()
        return self.np_random.choice(list(distribution.keys()), 1, p=np.array(list(distribution.values())))[0]

可以看到首先需要获得action的一个分布,这部分在 ϵ \epsilon ϵ-Greedy中的实现为:

    def get_distribution(self):
        distribution = {action: self.epsilon / self.action_space.n for action in range(self.action_space.n)}
        distribution[self.optimal_action] += 1 - self.epsilon
        return distribution

get_distribution 函数返回一个动作的概率分布字典。字典的键是动作,字典的值是动作被选择的概率。概率分布的计算方式为:每个动作都有一个基础概率 self.epsilon / self.action_space.n,其中 self.action_space.n 是动作的总数,即每个动作被选择的概率相等,这是基于探索的角度。同时,最优动作 self.optimal_action 会额外获得一个概率增量 1 - self.epsilon,这是基于利用的角度,即利用已知的最优动作。

sample 函数根据 get_distribution 函数得到的动作概率分布进行采样,返回一个动作。具体地,使用 np_random.choice 函数,其参数包括动作列表和对应的动作概率分布列表,返回的是一个根据给定概率分布随机采样的动作。

小结1

到此,act函数返回一个待执行的action,此部分的框图如下所示:

在这里插入图片描述
之后这几步在上一讲已经讨论过http://t.csdnimg.cn/ddpVJ。

        # Forward the actions to the environment viewer
        try:
            self.env.unwrapped.viewer.set_agent_action_sequence(actions)
        except AttributeError:
            pass
            
        # Step the environment
        previous_observation, action = self.observation, actions[0]
        transition = self.wrapped_env.step(action)
        self.observation, reward, done, truncated, info = transition
        terminal = done or truncated

        # Call callback
        if self.step_callback_fn is not None:
            self.step_callback_fn(self.episode, self.wrapped_env, self.agent, transition, self.writer)

Record the experience

现在step函数中只剩下这一步,我们再来看这一步的实现。

        # Record the experience.
        try:
            self.agent.record(previous_observation, action, reward, self.observation, done, info)
        except NotImplementedError:
            pass

直接跳转到AbstractDQNAgent类中查看相关实现

    def record(self, state, action, reward, next_state, done, info):
        """
            Record a transition by performing a Deep Q-Network iteration

            - push the transition into memory
            - sample a minibatch
            - compute the bellman residual loss over the minibatch
            - perform one gradient descent step
            - slowly track the policy network with the target network
        :param state: a state
        :param action: an action
        :param reward: a reward
        :param next_state: a next state
        :param done: whether state is terminal
        """
        if not self.training:
            return
        if isinstance(state, tuple) and isinstance(action, tuple):  # Multi-agent setting
            [self.memory.push(agent_state, agent_action, reward, agent_next_state, done, info)
             for agent_state, agent_action, agent_next_state in zip(state, action, next_state)]
        else:  # Single-agent setting
            self.memory.push(state, action, reward, next_state, done, info)
        batch = self.sample_minibatch()
        if batch:
            loss, _, _ = self.compute_bellman_residual(batch)
            self.step_optimizer(loss)
            self.update_target_network()

Replaybuffer

self.memory是Replaybuffer的一个实现

  self.memory = ReplayMemory(self.config)
  • push函数的实现可以提升运算速率。
  • 在强化学习中,经常需要从经验回放缓存(这里就是self.memory)中抽样出一批数据来更新模型。而这里的n-step是一个常用的技巧,它表明在预测下一个状态时,不仅仅使用当前的状态和动作,还使用接下来的n-1个状态和动作。当n为1时,这就是常见的单步过渡;当n大于1时,这就是n步采样。
rl_agents/agents/common/memory.py
class ReplayMemory(Configurable):
    """
        Container that stores and samples transitions.
    """
    def __init__(self, config=None, transition_type=Transition):
        super(ReplayMemory, self).__init__(config)
        self.capacity = int(self.config['memory_capacity'])
        self.transition_type = transition_type
        self.memory = []
        self.position = 0

    @classmethod
    def default_config(cls):
        return dict(memory_capacity=10000,
                    n_steps=1,
                    gamma=0.99)

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
            self.position = len(self.memory) - 1
        elif len(self.memory) > self.capacity:
            self.memory = self.memory[:self.capacity]
        # Faster than append and pop
        self.memory[self.position] = self.transition_type(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size, collapsed=True):
        """
            Sample a batch of transitions.

            If n_steps is greater than one, the batch will be composed of lists of successive transitions.
        :param batch_size: size of the batch
        :param collapsed: whether successive transitions must be collapsed into one n-step transition.
        :return: the sampled batch
        """
        # TODO: use agent's np_random for seeding
        if self.config["n_steps"] == 1:
            # Directly sample transitions
            return random.sample(self.memory, batch_size)
        else:
            # Sample initial transition indexes
            indexes = random.sample(range(len(self.memory)), batch_size)
            # Get the batch of n-consecutive-transitions starting from sampled indexes
            all_transitions = [self.memory[i:i+self.config["n_steps"]] for i in indexes]
            # Collapse transitions
            return map(self.collapse_n_steps, all_transitions) if collapsed else all_transitions

    def collapse_n_steps(self, transitions):
        """
            Collapse n transitions <s,a,r,s',t> of a trajectory into one transition <s0, a0, Sum(r_i), sp, tp>.

            We start from the initial state, perform the first action, and then the return estimate is formed by
            accumulating the discounted rewards along the trajectory until a terminal state or the end of the
            trajectory is reached.
        :param transitions: A list of n successive transitions
        :return: The corresponding n-step transition
        """
        state, action, cumulated_reward, next_state, done, info = transitions[0]
        discount = 1
        for transition in transitions[1:]:
            if done:
                break
            else:
                _, _, reward, next_state, done, info = transition
                discount *= self.config['gamma']
                cumulated_reward += discount*reward
        return state, action, cumulated_reward, next_state, done, info

    def __len__(self):
        return len(self.memory)

    def is_full(self):
        return len(self.memory) == self.capacity

    def is_empty(self):
        return len(self.memory) == 0

回到record代码中,首先将采样到的数据放入Replaybuffer,当采样数据量大于batch_size时,从Replaybuffer中采样。

    def sample_minibatch(self):
        if len(self.memory) < self.config["batch_size"]:
            return None
        transitions = self.memory.sample(self.config["batch_size"])
        return Transition(*zip(*transitions))

compute_bellman_residual

之后便利用bellman方程进行更新:

loss, _, _ = self.compute_bellman_residual(batch)
    def compute_bellman_residual(self, batch, target_state_action_value=None):
        # Compute concatenate the batch elements
        if not isinstance(batch.state, torch.Tensor):
            # logger.info("Casting the batch to torch.tensor")
            state = torch.cat(tuple(torch.tensor([batch.state], dtype=torch.float))).to(self.device)
            action = torch.tensor(batch.action, dtype=torch.long).to(self.device)
            reward = torch.tensor(batch.reward, dtype=torch.float).to(self.device)
            next_state = torch.cat(tuple(torch.tensor([batch.next_state], dtype=torch.float))).to(self.device)
            terminal = torch.tensor(batch.terminal, dtype=torch.bool).to(self.device)
            batch = Transition(state, action, reward, next_state, terminal, batch.info)

        # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
        # columns of actions taken
        state_action_values = self.value_net(batch.state)
        state_action_values = state_action_values.gather(1, batch.action.unsqueeze(1)).squeeze(1)

        if target_state_action_value is None:
            with torch.no_grad():
                # Compute V(s_{t+1}) for all next states.
                next_state_values = torch.zeros(batch.reward.shape).to(self.device)
                if self.config["double"]:
                    # Double Q-learning: pick best actions from policy network
                    _, best_actions = self.value_net(batch.next_state).max(1)
                    # Double Q-learning: estimate action values from target network
                    best_values = self.target_net(batch.next_state).gather(1, best_actions.unsqueeze(1)).squeeze(1)
                else:
                    best_values, _ = self.target_net(batch.next_state).max(1)
                next_state_values[~batch.terminal] = best_values[~batch.terminal]
                # Compute the expected Q values
                target_state_action_value = batch.reward + self.config["gamma"] * next_state_values

        # Compute loss
        loss = self.loss_function(state_action_values, target_state_action_value)
        return loss, target_state_action_value, batch
  • with torch.no_grad():用于禁止在其作用域内进行梯度计算
  • 实现了DoubleDQN
  • self.loss_function = loss_function_factory(self.config["loss_function"])loss函数包括以下几种:
def loss_function_factory(loss_function):
    if loss_function == "l2":
        return F.mse_loss
    elif loss_function == "l1":
        return F.l1_loss
    elif loss_function == "smooth_l1":
        return F.smooth_l1_loss
    elif loss_function == "bce":
        return F.binary_cross_entropy
    else:
        raise ValueError("Unknown loss function : {}".format(loss_function))

step_optimizer

对梯度进行了截断

    def step_optimizer(self, loss):
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.value_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

update_target_network

更新目标网络

    def update_target_network(self):
        self.steps += 1
        if self.steps % self.config["target_update"] == 0:
            self.target_net.load_state_dict(self.value_net.state_dict())

小结2

到此,整个DQN算法实现完毕,record部分的框图如下:

在这里插入图片描述

exploration_policy

这部分主要实现了三种策略:

  • Greedy
  • ϵ \epsilon ϵ-Greedy
  • Boltzmann

此部分可以参考:【强化学习】02—— 探索与利用

Greedy

Greedy贪婪策略即选择最优的策略 a t = arg max ⁡ a ∈ A Q ( s , a ) a_t=\argmax_{a\in\mathcal{A}}Q(s,a) at=argmaxaAQ(s,a)

class Greedy(DiscreteDistribution):
    """
        Always use the optimal action
    """

    def __init__(self, action_space, config=None):
        super(Greedy, self).__init__(config)
        self.action_space = action_space
        if isinstance(self.action_space, spaces.Tuple):
            self.action_space = self.action_space.spaces[0]
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.values = None
        self.seed()

    def get_distribution(self):
        optimal_action = np.argmax(self.values)
        return {action: 1 if action == optimal_action else 0 for action in range(self.action_space.n)}

    def update(self, values):
        self.values = values

ϵ \epsilon ϵ-Greedy

ϵ \epsilon ϵ-Greedy公式如下:
a t = { arg ⁡ max ⁡ a ∈ A Q ^ ( a ) , 采样概率:1- ϵ 从  A  中随机选择 , 采样概率:  ϵ a_t=\begin{cases}\arg\max_{a\in\mathcal{A}}\hat{Q}(a),&\text{采样概率:1-}\epsilon\\\text{从 }\mathcal{A}\text{ 中随机选择},&\text{采样概率: }\epsilon&\end{cases} at={argmaxaAQ^(a), A 中随机选择,采样概率:1-ϵ采样概率ϵ
这里实现的其实是衰减贪心策略,衰减曲线如下图所示。
ϵ = final-temperature + ( temperature − final-temperature ) ∗ e − t τ \begin{aligned}\epsilon &= \text{final-temperature}+(\text{temperature}-\text{final-temperature})*e^{\frac{-t}{\tau}}\end{aligned} ϵ=final-temperature+(temperaturefinal-temperature)eτt
在这里插入图片描述

class EpsilonGreedy(DiscreteDistribution):
    """
        Uniform distribution with probability epsilon, and optimal action with probability 1-epsilon
    """

    def __init__(self, action_space, config=None):
        super(EpsilonGreedy, self).__init__(config)
        self.action_space = action_space
        if isinstance(self.action_space, spaces.Tuple):
            self.action_space = self.action_space.spaces[0]
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.config['final_temperature'] = min(self.config['temperature'], self.config['final_temperature'])
        self.optimal_action = None
        self.epsilon = 0
        self.time = 0
        self.writer = None
        self.seed()

    @classmethod
    def default_config(cls):
        return dict(temperature=1.0,
                    final_temperature=0.1,
                    tau=5000)

    def get_distribution(self):
        distribution = {action: self.epsilon / self.action_space.n for action in range(self.action_space.n)}
        distribution[self.optimal_action] += 1 - self.epsilon
        return distribution

    def update(self, values):
        """
            Update the action distribution parameters
        :param values: the state-action values
        :param step_time: whether to update epsilon schedule
        """
        self.optimal_action = np.argmax(values)
        self.epsilon = self.config['final_temperature'] + \
            (self.config['temperature'] - self.config['final_temperature']) * \
            np.exp(- self.time / self.config['tau'])
        if self.writer:
            self.writer.add_scalar('exploration/epsilon', self.epsilon, self.time)

    def step_time(self):
        self.time += 1

    def set_time(self, time):
        self.time = time

    def set_writer(self, writer):
        self.writer = writer

Boltzmann

玻尔兹曼分布(Boltzmann Distribution)是描述分子在热力学平衡时分布的概率分布函数。它表明在给定的能量状态下,不同的微观状态出现的概率是不同的,且符合一个指数函数形式。

在热力学中,任何物质在一定温度下都会具有一定的热运动,这些热运动状态可以用分子内能或动能来描述。而玻尔兹曼分布表明了在相同温度下,分子在所有可能状态之间的分布概率。其表达式为:

P ( E i ) = e − E i / k T ∑ j e − E j / k T P(E_i) = \frac{e^{-E_i/kT}}{\sum_{j} e^{-E_j/kT}} P(Ei)=jeEj/kTeEi/kT

其中, P ( E i ) P(E_i) P(Ei)为分子处于能量状态 E i E_i Ei的概率, k k k为玻尔兹曼常数, T T T为温度, E j E_j Ej为所有可以达到的能量状态。

可以看到,玻尔兹曼分布中每个能量状态的出现概率与其能量成负指数关系,因此能量较小的状态出现的概率更大。这符合熵增加的趋势,即越有序的状态出现的概率越小。

class Boltzmann(DiscreteDistribution):
    """
        Uniform distribution with probability epsilon, and optimal action with probability 1-epsilon
    """

    def __init__(self, action_space, config=None):
        super(Boltzmann, self).__init__(config)
        self.action_space = action_space
        if not isinstance(self.action_space, spaces.Discrete):
            raise TypeError("The action space should be discrete")
        self.values = None
        self.seed()

    @classmethod
    def default_config(cls):
        return dict(temperature=0.5)

    def get_distribution(self):
        actions = range(self.action_space.n)
        if self.config['temperature'] > 0:
            weights = np.exp(self.values / self.config['temperature'])
        else:
            weights = np.zeros((len(actions),))
            weights[np.argmax(self.values)] = 1
        return {action: weights[action] / np.sum(weights) for action in actions}

    def update(self, values):
        self.values = values

运行结果

运行命令与方法在上一讲已经介绍【rl-agents代码学习】01——总体框架。

超参数设置采用默认设置,使用DQN算法分别运行4000steps和20000steps。使用Tensorboard查看结果:

 tensorboard --logdir C:\Users\16413\Desktop\rl-agents-master\scripts\out\IntersectionEnv\DQNAgent\baseline_20231113-123234_7944\

4000steps
在这里插入图片描述
在这里插入图片描述
可以看到最后的episode reward大致在3左右。

20000steps
在这里插入图片描述
可以看到最后的episode reward大致在3左右。


http://www.niftyadmin.cn/n/5180146.html

相关文章

正反手基础握拍

羽毛球基本握拍-正手/反手/反手高远 基本三种握拍-自己总结的 宽棱&#xff1a;与拍面平行 窄棱&#xff1a;与拍框平行 侧棱&#xff1a;在宽棱和窄棱之间 羽毛球拍柄的横截面是一个不规整的边形 正手握拍- 基本都差不多 不要苍蝇握拍一把抓就行&#xff0c;要点是大拇指和…

微信小程序:tabbar、事件绑定、数据绑定、模块化、模板语法、尺寸单位

目录 1. tabbar 1.1 什么是tabbar 1.2 配置tabbar 2. 事件绑定 2.1 准备表单 2.2 事件绑定 2.3 冒泡事件及非冒泡事件 3. 数据绑定 3.1 官方文档 4. 关于模块化 5. 模板语法 6. 尺寸单位 1. tabbar 1.1 什么是tabbar 下图中标记出来的部分即为tabbar&#xff1a…

Linux中at命令添加一次性任务

1、工作原理 功能&#xff1a;在某个时间点&#xff0c;执行一次命令。 特点&#xff1a;任务是用户隔离的。 条件&#xff1a;必须要保证atd进程存在。 ps -ef |grep atd 原理&#xff1a;atd进程循环遍历队列里的任务&#xff0c;有则按顺序执行任务&#xff0c;没有&#x…

【Linux】update-alternatives

安装: sudo update-alternatives --install link name path priority [ --slave slink sname spath]选项注释: link是在/usr/bin/,/usr/local/bin/等默认PATH搜索目录name是在/etc/alternatives目录中的链接名path是真正的可执行程序的位置,可以在任何位置priority是优先级 …

基于自私羊群算法优化概率神经网络PNN的分类预测 - 附代码

基于自私羊群算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于自私羊群算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于自私羊群优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

《RT-DETR魔术师》专栏介绍 CSDN独家改进创新实战 专栏目录

RT-DETR魔术师专栏介绍&#xff1a; https://blog.csdn.net/m0_63774211/category_12497375.html ✨✨✨魔改创新RT-DETR &#x1f680;&#x1f680;&#x1f680;引入前沿顶会创新&#xff08;CVPR2023&#xff0c;ICCV2023等&#xff09;&#xff0c;助力RT-DETR &#…

LockBit3.0的字符串解密方法

LockBit 与大多数勒索黑客团体一样以勒索软件即服务 (RaaS) 模式运行,该组织于 2019 年 9 月首次被观察到,此后发展为了今年最主要的勒索软件团伙,甚至超过了Conti、Hive等其他知名团体。据泄露数据站点的数据统计表明,LockBit占2022年第一季度所有与勒索软件相关的泄露事件…

1334. 阈值距离内邻居最少的城市

分析题目两点“阈值距离”、“邻居最少”。 “阈值距离”相当于定了个上界&#xff0c;求节点之间的最短距离。 “邻居最少”相当于能连接的点的数量。 求节点之间的最短距离有以下几种方法&#xff1a; 在这道题当中&#xff0c;n的范围是100以内&#xff0c;所以可以考虑O(n…