【RocketMQ 存储】- broker 端存储单条消息的逻辑

news/2025/1/31 12:11:41 标签: java-rocketmq, rocketmq, 消息存储

文章目录

  • 1. 前言
  • 2. DefaultMessageStore#asyncPutMessage 添加单条消息
    • 2.1 DefaultMessageStore#checkStoreStatus 检查存储服务的状态
    • 2.2 DefaultMessageStore#checkMessage 校验消息长度是否合法
    • 2.3 CommitLog#asyncPutMessage 核心存储逻辑
    • 2.4 MappedFile#appendMessage
    • 2.5 CommitLog#doAppend 追加消息到 CommitLog
    • 2.6 DefaultMessageStore#asyncPutMessage 结尾
  • 3. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

上一篇文章中我们已经解析了 RocketMQ 的存储结构中的 MappedFIile,这篇文章中,我们来介绍下 broker 端是如何处理 producer 端发送过来的消息的,存储消息的入口在 DefaultMessageStore#asyncPutMessageDefaultMessageStore#asyncPutMessages,前者是单条消息的添加,后面是批量添加,这篇文章就看下 asyncPutMessage 的逻辑。


2. DefaultMessageStore#asyncPutMessage 添加单条消息

asyncPutMessages 是 RocketMQ 中处理单条消息的逻辑,下面是这个方法的整体逻辑。

/**
 * 消息批量添加
 * @param messageExtBatch the message batch
 * @return
 */
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
    // 1.检查下存储服务的状态
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        // 这里就是消息存储服务不可用或者操作系统页繁忙,直接返回结果
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }

    // 2.校验消息看看是否合法
    PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        // 消息长度不合法
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    // 3.当前时间
    long beginTime = this.getSystemClock().now();
    // 4.存储消息的核心逻辑
    CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

    // 这里就是结果处理
    resultFuture.thenAccept((result) -> {
        // 当消息存储完成之后,lambda 表达式会被调用
        // 消息消耗的时间
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }

        // 设置下存储消息的消耗时间和最大消耗时间
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            // 这里就是存储失败了,新增存储失败的次数
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

    return resultFuture;
}

2.1 DefaultMessageStore#checkStoreStatus 检查存储服务的状态

消息存储服务的上层就是 DefaultMessageStore,checkStoreStatus 方法是专门用于检测这个服务的状态。

/**
 * 检查存储服务的状态
 * @return
 */
private PutMessageStatus checkStoreStatus() {
    ...
}

下面一步一步看下里面的核心,其实就是判断存储服务的各种状态。首先就是判断下服务有没有关闭,也就是 this.shutdown 是否等于 true,当 broker 服务 shutdown 的时候就会调用 DefaultMessageStore#shutdown 方法设置这个属性为 true。

// 1.如果存储服务 DefaultMessageStore 是 SHUTDOWN 状态,就是关闭了,这时候服务不可用
if (this.shutdown) {
    log.warn("message store has shutdown, so putMessage is forbidden");
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}

然后就是判断当前节点是不是一个 slave 节点,slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障,所以说写入数据是 master 才能写入。

// 2.slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    long value = this.printTimes.getAndIncrement();
    // 日志打印
    if ((value % 50000) == 0) {
        log.warn("broke role is slave, so putMessage is forbidden");
    }
    // 服务不可用
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}

接着再判断是否支持写入,这里会判断下 broker 磁盘是否满了、或者说写入 ConsumeQueue 或创建 IndexFile 的过程出现异常,出现这些问题就直接发挥结果。

// 3.如果不支持写入,就是说有可能 broker 磁盘满了、写入 ConsumeQueue 或者 IndexFile 错误
if (!this.runningFlags.isWriteable()) {
    long value = this.printTimes.getAndIncrement();
    if ((value % 50000) == 0) {
        log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
            "the broker's disk is full, write to logic queue error, write to index file error, etc");
    }
    // 这种情况也会返回服务不可用
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
    this.printTimes.set(0);
}

这个方法里面通过 isWriteable 判断是否可写,就是判断下标志位是不是出现上面说的几种情况。

public boolean isWriteable() {
    if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
        return true;
    }

    return false;
}

回到 checkStoreStatus 方法,最后还会判断下操作系统是否繁忙,如果繁忙就直接返回错误码。

// 4.操作系统页缓存是否繁忙
if (this.isOSPageCacheBusy()) {
    // 如果是繁忙就返回错误码
    return PutMessageStatus.OS_PAGECACHE_BUSY;
}

这个 isOSPageCacheBusy 方法就是判断下操作系统页缓存是否繁忙,其实意思应该是对操作系统操作是否繁忙,来看下里面的逻辑。

/**
 * 判断操作系统页是不是繁忙
 * @return
 */
@Override
public boolean isOSPageCacheBusy() {
    // beginTimeInLock 是当往 CommitLog 里面添加消息的时候会加锁并且设置这个变量为加锁的时间
    // 添加完消息之后就会解锁,这个 beginTimeInLock 会被重置为 0
    long begin = this.getCommitLog().getBeginTimeInLock();
    // 这个就是加锁加了多长时间
    long diff = this.systemClock.now() - begin;

    // diff >= 10000s 代表距离上一次写入 CommitLog 已经超过 10000s 了
    // diff <= 1s 代表持有锁的时间还没到 1s
    // 这两种情况就是不繁忙,为什么第一种是不繁忙呢,因为数据写入是不太可能会超过 10000s 的,如果这么久都没能写入 CommitLog 就肯定是出问题了
    // 这时候也不需要考虑是不是系统繁忙了
    return diff < 10000000
        && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}

beginTimeInLock 是当往 CommitLog 里面添加消息的时候会加锁并且设置这个变量为加锁的时间,添加完消息之后就会解锁,这个 beginTimeInLock 会被重置为 0,这里说的添加消息是添加到 CommitLog 下面的 MappedFile。

再来看下里面的判断,这里面会判断满足 diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills() 就认为是页繁忙。osPageCacheBusyTimeOutMills 默认是 1s,也就是说持有锁的时间在 (1s, 10000s) 就认为对操作系统页缓存操作繁忙。

那为什么要有一个小于 10000s 的限制呢? 上面也说了,当添加完消息就会解锁,这时候 beginTimeInLock 会被重置为 0,这时候计算出来的 diff 就是 System.currentTimeMillis,所以这个逻辑其实会判断出这种情况下是没有加锁的,于是就不算繁忙。

所以这里的逻辑就很明确了,这里会认为 broker 对消息的写入是不会超过 10000s 的,同时当超过 1s 就认为持有锁的时间太久了,消息写入时间太长了,有可能是对 MappedByteBuffer 写入的时候发生了缓存页刷新等操作导致出现问题,这时候返回错误码 PutMessageStatus.OS_PAGECACHE_BUSY

最后回到 checkStoreStatus 方法,经过上面的判断如果都不满足,就返回 PutMessageStatus.PUT_OK,意思是可以往 CommitLog 中追加消息,下面是整体逻辑。

/**
 * 检查存储服务的状态
 * @return
 */
private PutMessageStatus checkStoreStatus() {
    // 1.如果存储服务 DefaultMessageStore 是 SHUTDOWN 状态,就是关闭了,这时候服务不可用
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    }

    // 2.slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        // 日志打印
        if ((value % 50000) == 0) {
            log.warn("broke role is slave, so putMessage is forbidden");
        }
        // 服务不可用
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    }

    // 3.如果不支持写入,就是说有可能 broker 磁盘满了、写入 ConsumeQueue 或者 IndexFile 错误
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                "the broker's disk is full, write to logic queue error, write to index file error, etc");
        }
        // 这种情况也会返回服务不可用
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    } else {
        this.printTimes.set(0);
    }

    // 4.操作系统页缓存是否繁忙
    if (this.isOSPageCacheBusy()) {
        // 如果是繁忙就返回错误码
        return PutMessageStatus.OS_PAGECACHE_BUSY;
    }
    // 5.这里就是可以添加消息
    return PutMessageStatus.PUT_OK;
}

2.2 DefaultMessageStore#checkMessage 校验消息长度是否合法

这里面逻辑就比较简单了,我就不细说,里面主要包括两条规则。

  1. 消息 topic 长度不能超过 127
  2. 消息属性长度不能超过 32767
/**
 * 校验消息是否合法
 * @param msg
 * @return
 */
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
    // topic 长度不能超过 127
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return PutMessageStatus.MESSAGE_ILLEGAL;
    }

    // 消息属性长度不能超过 32767
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return PutMessageStatus.MESSAGE_ILLEGAL;
    }
    // 这里就是检查通过,没问题
    return PutMessageStatus.PUT_OK;
}

2.3 CommitLog#asyncPutMessage 核心存储逻辑

这里是添加消息的核心逻辑,上面我略过了检查 LMQ 消息是否合法这段逻辑,LMQ 队列是一种轻量级的队列,这里就先不展开细说了,来看下 CommitLog#asyncPutMessage 的核心逻辑。

这个方法里面的逻辑太多了,我这里就慢慢拆解。但是大家可以先看下下面的图,最终消息就是按下面的顺序存到 ByteBuffer 中的。

在这里插入图片描述

代码的逻辑首先设置下消息的存储时间和消息 CRC 校验码。

// 1.设置消息的存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting on the client)
// 2.设置消息体 CRC 校验码
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

接着来处理不同事务状态的消息,我们知道 RocketMQ 的事务消息分为 4 个状态,关于事务消息后续会出相关文章来介绍源码。

  • MessageSysFlag.TRANSACTION_NOT_TYPE: 普通消息,不是事务消息
  • MessageSysFlag.TRANSACTION_PREPARED_TYPE: 事务消息是一阶段 PREPARE 状态
  • MessageSysFlag.TRANSACTION_COMMIT_TYPE: 事务消息是 Commit 状态,意味者可以投到真实队列
  • MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 消息一阶段的 Rollback
    在这里插入图片描述

上面简单介绍了事务类型,这几种类型中,对于 TRANSACTION_NOT_TYPETRANSACTION_COMMIT_TYPE 这两种类型会去处理延时队列的请求。延时消息也是 RocketMQ 的一种消息,RocketMQ 提供了 18 中延时级别,当一条延时消息被创建出来时首先会被加入名为 SCHEDULE_TOPIC_XXXX 的 topic 下面的队列,这个 SCHEDULE_TOPIC_XXXX topic 提供了 18 个消息队列,每一个队列对应一个延时等级,所谓的延时等级就是延时时间,这里就不多说了,下面来看下这两种事务状态下处理延时等级的逻辑。

// 3.这里就是获取消息的事务状态,处理延时消息的逻辑
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 首先能设置到 CommitLog 中的要么就是非事务消息,要么就是事务一阶段 Commit 的消息,
// 也就是消息本地执行逻辑没有问题,后续就可以把事务消息从事务投放到普通队列
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // 下面是处理延迟消息的逻辑,延迟消息有一个延时等级,每个延时等级都对应一个延时时间
    if (msg.getDelayTimeLevel() > 0) {
        // 延时等级不能超过最大值
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }

        // 延时队列 topic
        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        // 根据延时等级获取这个 topic 下面对应的队列,延时 topic 下每一个延时等级都对应第一个队列,延时消息首先就会存储在这些队列中
        // 等到执行时间了就会将这些消息提交到真实 topic 和下面的队列
        int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // 从消息的属性中根据 REAL_TOPIC 和 REAL_QID 来获取真实的 topic 和队列 id
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

        // 设置消息属性
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        // 设置真实 topic 和队列 id 到消息中,后续这个到期的消息就会被发送到真实队列里面供消费者消费了
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

这里的逻辑其实就是设置真实的 topicqueueId,对于延时消息,真实的 topic 会存储在 properties 中,key 是 REAL_TOPIC;而真实的 queueId 也会被存在 properties 中,key 是 PROPERTY_REAL_QUEUE_ID

下面继续设置消息标记,也就是消息的 broker 存储端和 producer 产生端是 IPV
4 还是 IPV6,为什么要设置这个标记呢,因为 IPV4 和 IPV6 的字节数不同,计算出来的消息长度也不同,所以这里设置一个标记是为了后面动态计算消息长度的。

// 消息产生的地址,也就是 Producer 发送端的地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
    // 如果是 IPV6,就设置下消息的发送端标记
    msg.setBornHostV6Flag();
}

// 消息存储的地址,也就是 Broker 存储端的地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
    // 如果是 IPV6,就设置下消息的存储端标记
    msg.setStoreHostAddressV6Flag();
}

好了,接着往下看,下面是从 PutMessageThreadLocal 本地变量中拿到 encoder 对消息进行编码,其实说是编码,不如说是把消息 MessageExtBrokerInner 的内容转移到 ByteBuffer 中,这个 ByteBuffer 是一个 HeapByteBuffer,是一个临时变量,临时存储消息的。

// 4.从线程本地变量获取 PutMessageThreadLocal
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 5.使用本地变量里面的 MessageExtEncoder 对消息编码并存储到 encoderBuffer 中,并且调用 flip 切换读模式,后面就可以开始读取了
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
    // 如果不为空就是上面编码的时候对于一些属性的校验失败了,直接返回
    return CompletableFuture.completedFuture(encodeResult);
}
// 6.设置消息编码结果
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);

来看下面里面的 encode 逻辑,这个方法会对消息进行编码,这里面的逻辑我就不多介绍了,看下面这张图,这里面的方法就是按照这个图的字段顺序进行编码。
在这里插入图片描述

/**
 * 对消息进行编码
 * @param msgInner 需要编码的消息对象
 * @return
 */
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
    /**
     * 序列化消息的属性
     */
    // 将消息的属性字符串转成数组
    final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    // 计算属性数据的长度
    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    // 如果属性长度超过 Short 的最大值
    if (propertiesLength > Short.MAX_VALUE) {
        // 返回结果
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    // topic
    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    // topic 长度
    final int topicLength = topicData.length;

    // 计算消息体长度
    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    // 计算消息长度,这里传入 bodyLength、topicLength、propertiesLength 是因为这几个都是不定长的,消息里面会存储这部分数据,所以
    // 需要计算这部分数据的长度。而 sysFlag 可以用来判断里面的生产者地址或者 broker 地址是不是 IPV6 的。
    final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

    // 如果消息总长度超过最大允许的消息大小,返回消息非法的结果
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    // 初始化存储空间的 ByteBuffer,大小为消息的总长度
    this.resetByteBuffer(encoderBuffer, msgLen);
    // 1.写入消息的总长度(TOTALSIZE)
    this.encoderBuffer.putInt(msgLen);
    // 2.写入魔法数(MAGICCODE),用于标识消息的版本或类型
    this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3.写入消息体的 CRC(BODYCRC),用于校验消息体的完整性
    this.encoderBuffer.putInt(msgInner.getBodyCRC());
    // 4.写入队列 ID(QUEUEID),标识消息所属的队列
    this.encoderBuffer.putInt(msgInner.getQueueId());
    // 5.写入消息的标志(FLAG),用于标识消息的一些特性
    this.encoderBuffer.putInt(msgInner.getFlag());
    // 6.写入队列偏移量(QUEUEOFFSET),初始值为 0,后续需要更新
    this.encoderBuffer.putLong(0);
    // 7.写入物理偏移量(PHYSICALOFFSET),初始值为 0,后续需要更新
    this.encoderBuffer.putLong(0);
    // 8.写入系统标志(SYSFLAG),标识消息的系统特性
    this.encoderBuffer.putInt(msgInner.getSysFlag());
    // 9.写入消息的生成时间戳(BORNTIMESTAMP),表示 Producer 消息生成的时间
    this.encoderBuffer.putLong(msgInner.getBornTimestamp());
    // 10.Producer 端的 IP + 端口
    socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
    // 11.写入消息的存储时间戳(STORETIMESTAMP),表示消息在 Broker 存储的时间
    this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
    // 12.Broker 端的 IP + 端口
    socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
    // 13.消息消费重试次数
    this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
    // 14.写入预处理事务的偏移量(Prepared Transaction Offset),用于事务消息
    this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
    // 15.写入消息体的长度(BODY),并写入消息体数据
    this.encoderBuffer.putInt(bodyLength);
    if (bodyLength > 0)
        this.encoderBuffer.put(msgInner.getBody());
    // 16.topic 的长度和具体数据
    this.encoderBuffer.put((byte) topicLength);
    this.encoderBuffer.put(topicData);
    // 17.写入属性的长度和消息的属性值
    this.encoderBuffer.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.encoderBuffer.put(propertiesData);

    // 18.切换读模式,切换后就可以开始读取这条消息了
    encoderBuffer.flip();
    return null;
}

这个 encode 方法中还有一个 calMsgLength,这个方法就是通过 sysFlagbodyLengthtopicLengthpropertiesLength 来计算消息长度,之所以传入这几个参数,就是因为 CommitLog 消息中这些变量是不定长的,比如消息体。

/**
 * 计算消息长度
 * @param sysFlag           是否是 IPV6
 * @param bodyLength        消息体长度
 * @param topicLength       topic 长度
 * @param propertiesLength  消息属性长度
 * @return
 */
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
    // 如果是 IPV6 就是 20,否则是 8,这是因为 bornhost 是包括 IP + port 的,所以会多上 4 字节
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    // 这里也是同理
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    // 计算一条 CommitLog 消息的长度
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + bornhostLength //BORNHOST
        + 8 //STORETIMESTAMP
        + storehostAddressLength //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

继续回到 asyncPutMessage 方法,上面对消息进行了编码,下面创建一个消息存储的上下文,这个上下文里面存储了编码后的消息,同时存储了一个 key,这个 key 是 “topic-queueId”,专门用于 CommitLog#topicQueueTable 。

// CommitLog 的属性
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);

// CommitLog#asyncPutMessage
// 7.下面是存储消息上下文,这里的 key 是 CommitLog#topicQueueTable 集合中的 key,用来存储 [队列 ID -> 偏移量] 的映射关系
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

/**
 * 根据 topic 和 queueId 生成 key
 * @param keyBuilder
 * @param messageExt
 * @return
 */
private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
    // topic-queueId
    keyBuilder.setLength(0);
    keyBuilder.append(messageExt.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(messageExt.getQueueId());
    // 返回结果
    return keyBuilder.toString();
}

下面继续,这里会加锁,因为 broker 中所有消息都会发送到一个 CommitLog 中,这里就需要加锁了,加锁有两种实现,自旋或者 ReentrantLock 互斥锁,看 StoreConfig 具体实现。

putMessageLock.lock();
try {
	...
}   

加锁之后就是添加消息前的处理鲁珀及,涉及到设置存储消息、获取要存储的 MappedFile 等。

// 获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 加锁时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// 设置加锁时间,这个加锁时间可以用来计算持有锁的时间从而来判断刷盘时间久不久,操作系统刷盘是不是繁忙
this.beginTimeInLock = beginLockTimestamp;

// 设置消息的存储时间为加锁的时间,确保全局有序
msg.setStoreTimestamp(beginLockTimestamp);

// 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 如果还是获取不到,那么说明创建失败了
if (null == mappedFile) {
    // 返回结果 CREATE_MAPEDFILE_FAILED
    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}

上面的方法中的 getLastMappedFile 方法就是用于获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件,就是最新的 MappedFile 文件,而 beginTimeInLock 还记得这个属性吗,在 2.1 小结 checkStoreStatus 判断页缓存繁忙就是用这个属性来判断的,这个属性的初始化就是在这里,下面来看下如何获取 MappedFile 的。

/**
* 获取文件列表末尾的 MappedFile,就是最后一个 MappedFile,也是正在写入的 MappedFile
* @return
*/
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;

while (!this.mappedFiles.isEmpty()) {
    try {
        // 获取最后一个 MapperFile
        mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
        break;
    } catch (IndexOutOfBoundsException e) {
        //continue;
    } catch (Exception e) {
        log.error("getLastMappedFile has exception.", e);
        break;
    }
}

这个方法逻辑很简单,就是获取 mappedFileLast 的最后一个 MappedFile,当然还有一个 this.mappedFileQueue.getLastMappedFile(0) 方法,这个方法和上面的 getLastMappedFile() 不一样,这个方法如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile,当然这里的逻辑我先不说,在后面会写文章介绍,因为里面涉及到创建 MappedFile 的服务。

继续回到 CommitLog#asyncPutMessage 方法,下面就是核心逻辑追加消息到 CommitLog 中。

// 核心逻辑,追加消息到 CommitLog 中
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

追加完了之后,会处理追加消息的结果,注意如果是 END_OF_FILE 就是说上一个文件已经满了,这时候会创建一个新的文件,并且赋值 unlockMappedFile = mappedFile,也就是说会将 unlockMappedFile 设置为已经写满了的那个文件,记住这个变量,下面有大用。

// 下面判断下追加消息的结果
switch (result.getStatus()) {
    case PUT_OK:
        // 添加成功,直接退出
        break;
    case END_OF_FILE:
        // 文件剩余空间不足,那么初始化新的文件并尝试再次存储
        unlockMappedFile = mappedFile;
        // 创建一个新的文件
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        if (null == mappedFile) {
            // 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILED
            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
        }
        // 创建成功后再次添加消息
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // 退出
        break;
    case MESSAGE_SIZE_EXCEEDED:
    case PROPERTIES_SIZE_EXCEEDED:
        // 消息长度错误
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
    case UNKNOWN_ERROR:
        // 位置错误
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    default:
        // 其他错误
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}

// 这里就是加锁的时间,也是添加消息所耗费的时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;

最后在 finally 中进行解锁并重置加锁时间。

finally {
    // 最后解锁并且重置 beginTimeInLock
    beginTimeInLock = 0;
    putMessageLock.unlock();
}

上面就已经将消息追加到 CommitLog 中的 ByteBuffer 了,这里的 ByteBuffer 有可能是 MappedByteBuffer,也有可能是读写分离创建出来的 writeBuffer,最后来看下添加之后的后续逻辑。

// 如果加锁时间超过了 500ms
if (elapsedTimeInLock > 500) {
    // 记录日志
    log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}

上面就是判断如果加锁时间超过了 500ms,就记录下日志。

// 如果文件写满了并且启用了文件预热
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
    // 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了
    // 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile
    // 文件上,对于旧的 unlockMappedFile 读写就没有那么多了
    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

然后上面判断下如果文件写满了并且开启了文件预热,因为开启文件预热,那么在创建 MappedFile 的时候会调用 warmMappedFIle 方法每隔 4K 就写入一个 0 触发中断建立虚拟内存到物理内存的页表项,同时调用 mappedFile.mlock 锁定这片内存,防止交换到 swap 磁盘中

那么在这里就需要将 unlockMappedFile 解锁,unlockMappedFile 还记得吗,上面将写满的文件赋值给 unlockMappedFile,因为文件已经写满了,所以会对这个文件锁定的空间进行解锁,至于为什么解锁,是因为一般来说都是最后一个文件操作最频繁,这种情况下就需要确保最后一个文件不被交换到 swap 空间防止 mmap 映射出来的虚拟内存和物理内存页表项被删掉。但是总得有不需要用的物理页交换到这里 swap 空间以腾出空间给热点访问页,所以对于写满的 MappedFile,既然后续也不会频繁访问,干脆就解锁内存,让这些物理页能交换到 swap 空间了。

好了,继续看下面的逻辑,是一些数据统计。

// 返回结果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// 下面是一些数据统计,如写入 topic 的消息数 + 1
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
// 存储的消息总字节 + result.getWroteBytes()
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

接下来就是提交刷盘请求,因为上面已经把数据写入 CommitLog 的 ByteBuffer 了,所以下面会提交刷盘请求和从节点赋值请求,这些请求的处理都是由其他服务异步处理的。

// 9.上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 10.RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

最后返回结果。

// 处理刷盘请求
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    // 外层会阻塞等待刷盘请求和从节点复制请求的结果
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    return putMessageResult;
});

好了,上面就是 CommitLog#asyncPutMessage 的核心逻辑,下面我贴下整体代码。

/**
 * broker 异步存储消息
 * @param msg
 * @return
 */
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 1.设置消息的存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting on the client)
    // 2.设置消息体 CRC 校验码
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    // 消息 topic
    String topic = msg.getTopic();
    // int queueId = msg.getQueueId();

    // 3.这里就是获取消息的事务状态,处理延时消息的逻辑
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 首先能设置到 CommitLog 中的要么就是非事务消息,要么就是事务一阶段 Commit 的消息,
    // 也就是消息本地执行逻辑没有问题,后续就可以把事务消息从事务投放到普通队列
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 下面是处理延迟消息的逻辑,延迟消息有一个延时等级,每个延时等级都对应一个延时时间
        if (msg.getDelayTimeLevel() > 0) {
            // 延时等级不能超过最大值
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 延时队列 topic
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 根据延时等级获取这个 topic 下面对应的队列,延时 topic 下每一个延时等级都对应第一个队列,延时消息首先就会存储在这些队列中
            // 等到执行时间了就会将这些消息提交到真实 topic 和下面的队列
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 从消息的属性中根据 REAL_TOPIC 和 REAL_QID 来获取真实的 topic 和队列 id
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

            // 设置消息属性
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            // 设置真实 topic 和队列 id 到消息中,后续这个到期的消息就会被发送到真实队列里面供消费者消费了
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    // 消息产生的地址,也就是 Producer 发送端的地址
    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        // 如果是 IPV6,就设置下消息的发送端标记
        msg.setBornHostV6Flag();
    }

    // 消息存储的地址,也就是 Broker 存储端的地址
    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        // 如果是 IPV6,就设置下消息的存储端标记
        msg.setStoreHostAddressV6Flag();
    }

    // 4.从线程本地变量获取 PutMessageThreadLocal
    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    // 5.使用本地变量里面的 MessageExtEncoder 对消息编码并存储到 encoderBuffer 中,并且调用 flip 切换读模式,后面就可以开始读取了
    PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
    if (encodeResult != null) {
        // 如果不为空就是上面编码的时候对于一些属性的校验失败了,直接返回
        return CompletableFuture.completedFuture(encodeResult);
    }
    // 6.设置消息编码结果
    msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
    // 7.下面是存储消息上下文,这里的 key 是 CommitLog#topicQueueTable 集合中的 key,用来存储 [队列 ID -> 偏移量] 的映射关系
    PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;

    // 8.加锁,因为 broker 中所有消息都会发送到一个 CommitLog 中,这里就需要加锁了
    //   这里的加锁有两种实现,自旋或者 ReentrantLock 互斥锁,看 StoreConfig 具体实现
    putMessageLock.lock();
    try {
        // 获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 加锁时间
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        // 设置加锁时间,这个加锁时间可以用来计算持有锁的时间从而来判断刷盘时间久不久,操作系统刷盘是不是繁忙
        this.beginTimeInLock = beginLockTimestamp;

        // 设置消息的存储时间为加锁的时间,确保全局有序
        msg.setStoreTimestamp(beginLockTimestamp);

        // 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        // 如果还是获取不到,那么说明创建失败了
        if (null == mappedFile) {
            // 返回结果 CREATE_MAPEDFILE_FAILED
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }

        // 核心逻辑,追加消息到 CommitLog 中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // 下面判断下追加消息的结果
        switch (result.getStatus()) {
            case PUT_OK:
                // 添加成功,直接退出
                break;
            case END_OF_FILE:
                // 文件剩余空间不足,那么初始化新的文件并尝试再次存储
                unlockMappedFile = mappedFile;
                // 创建一个新的文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILED
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                // 创建成功后再次添加消息
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                // 退出
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                // 消息长度错误
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
                // 位置错误
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            default:
                // 其他错误
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }

        // 这里就是加锁的时间,也是添加消息所耗费的时间
        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    } finally {
        // 最后解锁并且重置 beginTimeInLock
        beginTimeInLock = 0;
        putMessageLock.unlock();
    }

    // 如果加锁时间超过了 500ms
    if (elapsedTimeInLock > 500) {
        // 记录日志
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    // 如果文件写满了并且启用了文件预热
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        // 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了
        // 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile
        // 文件上,对于旧的 unlockMappedFile 读写就没有那么多了
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    // 返回结果
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // 下面是一些数据统计,如写入 topic 的消息数 + 1
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
    // 存储的消息总字节 + result.getWroteBytes()
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

    // 9.上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 10.RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 处理刷盘请求
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        // 外层会阻塞等待刷盘请求和从节点复制请求的结果
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

2.4 MappedFile#appendMessage

这个方法就是上面 CommitLog#asyncPutMessage 调用的核心逻辑,就是在这里面追加消息的。

/**
 * 追加消息到 MappedFile 的 ByteBuffer
 * @param msg               消息
 * @param cb                回调函数,实现是 DefaultAppendMessageCallback
 * @param putMessageContext 消息存放的上下文
 * @return
 */
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    return appendMessagesInner(msg, cb, putMessageContext);
}

再来看下 appendMessagesInner 方法的实现。

/**
 * 把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的
 * @param messageExt
 * @param cb
 * @param putMessageContext
 * @return
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    assert messageExt != null;
    assert cb != null;
    // 获取写指针的位置
    int currentPos = this.wrotePosition.get();
    // 写指针小于文件大小,可以写入
    if (currentPos < this.fileSize) {
        // 是否开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离)
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        // 标记写入位置
        byteBuffer.position(currentPos);
        // 消息写入结果
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            // 单条消息写入
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } else if (messageExt instanceof MessageExtBatch) {
            // 批量消息写入
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
        } else {
            // 不知道是什么
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        // 更新写入位置
        this.wrotePosition.addAndGet(result.getWroteBytes());
        // 设置消息存入的时间(最新)
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

这个方法就是把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的。wrotePosition 是写入的 MappedFile 的指针,一个 MappedFile 一个 wrotePosition 变量。同样的在写入之前会调用 slice 方法获取文件视图。

通过这个视图的获取方法就能看出,如果开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离),这时候写入的数据是直接写入的 writeBuffer,但是由于不是 mmap 映射出来的 MappedByteBuffer,所以写入 writeBuffer 还得 commit 提交到 Page Cache 才能 flush 刷盘。

而如果没有开启读写分离,消息写入 MappedByteBuffer 就相当于写入 Page Cache,这时候就不需要 commit,可以直接 flush 刷盘。


2.5 CommitLog#doAppend 追加消息到 CommitLog

/**
* 追加消息
* @param fileFromOffset    文件起始索引
* @param byteBuffer        CommitLog 的 ByteBuffer(没有开启读写分离就是 mappedByteBuffer,开启了就是 writeBuffer)
* @param maxBlank          文件还有多少空位可以写入数据
* @param msgInner          要写入的消息
* @param putMessageContext 写入消息上下文
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
   final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
   ...
}

来看下里面的逻辑,首先获取写指针的位置,就是从当前 CommitLog 的哪个下标开始写入数据。

// 写指针的位置,物理偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();

然后构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识,构建出来的 ID 格式是:ip + port + wroteOffset

// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
Supplier<String> msgIdSupplier = () -> {
    // 消息类型
    int sysflag = msgInner.getSysFlag();
    // 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
    // 这个消息 ID 组成是: ip + port + wroteOffset
    int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
    // 分配存储消息 ID 的 ByteBuffer,这里是一个 HeapByteBuffer
    ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
    // 首先把 ip + port 设置进去,然后切换读模式
    MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
    // 这里 clear 把 ByteBuffer 里面的属性重置了
    msgIdBuffer.clear();
    // 设置 wroteOffset
    msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
    // 转成 16 进制字符串返回
    return UtilAll.bytes2string(msgIdBuffer.array());
};

下面接着通过 “topic-queueId” 获取 topicQueueTable 中的 ComsumeQueue 队列偏移量,这里的偏移量其实是指第几条索引,真正偏移量需要通过 queueOffset * 20 求出来,20 是 ConsumeQueue 中一条索引长度。

// Record ConsumeQueue information
// 记录 ConsumeQueue 的信息
// key = topic-queueId
String key = putMessageContext.getTopicQueueTableKey();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// queueOffset 是 ConsumeQueue 中的最大偏移量,当 broker 写入一条消息,就会设置对应的 queueOffset++,
// 所以实际的 ConsumeQueue 偏移量应该等于 queueOffset * 20
if (null == queueOffset) {
    // 没有记录这个 key 就初始化为 0
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

下面是 LMQ 的处理,这里先不细说了,这里直接看源码。

// LMQ 的处理,先不看
boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
if (!multiDispatchWrapResult) {
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

上面我们不是获取了 queueOffset 吗,对于事务 TRANSACTION_PREPARED_TYPETRANSACTION_ROLLBACK_TYPE 这两个类型的消息是不会添加 ConsumeQueue 索引的,所以这里的 queueOffset 会被设置为 0;

// 获取到之前编码过的 ByteBuffer
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
// 获取消息总长度
final int msgLen = preEncodeBuffer.getInt(0);

// 需要特殊处理的事务消息
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepare 阶段和 rollback 阶段的消息是不会设置到消费队列的
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    // 如果是普通消息和 commit 阶段的事务消息才会设置到消费队列
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

接着下面就是添加消息的逻辑了,但是在处理添加逻辑之前,需要看下 CommitLog 里面还有没有足够多的空间。

// 确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    // 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
    // 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
    this.msgStoreItemMemory.clear();
    // 1.存储消息总长度
    this.msgStoreItemMemory.putInt(maxBlank);
    // 2.魔术设置为 BLANK_MAGIC_CODE
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
    // 3.当前时间
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // 把这部分数据设置到 byteBuffer 中
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
    // 返回结果,外层会对 END_OF_FILE 做特殊处理
    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
            maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
            msgIdSupplier, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

这里面其实就是会预留 8 个字节的空间出来,如果不满足,那么就会把 存储消息总长度 + BLANK_MAGIC_CODE 存储到 ByteBuffer 的尾部,标识当前文件已满。

preEncodeBuffer 是之前 encode 方法编码过的消息,不过上面编码消息的时候 queueOffset(消息在 CommitLog 中的起始偏移量)、physical offset(消息在 CommitLog 中的起始偏移量)、storeTimestamp(消息存储到 CommitLog 的时间)这些是没有初始化的,因为要等到这个 doAppend 方法才能初始化。

// 下面要开始从第 6 个位置开始设置信息,所以 pos 从 20 开始
int pos = 4 + 4 + 4 + 4 + 4;
// 6.ConsumeQueue 中的偏移量,实际偏移量等于 queueOffset * 20
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7.这条消息在 CommitLog 中的起始偏移量
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8.SYSFLAG, 9.BORNTIMESTAMP, 10.BORNHOST, 11.STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// 重新设置消息的存储时间,这里初始化的时候设置成 0,所以这里会设置
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());

设置完之后就需要添加到 CommitLog 的 ByteBuffer 中。

// 起始时间
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 把消息写入 MappedFile 的 ByteBuffer,有可能是 mappedByteBuffer 或者 writeBuffer
byteBuffer.put(preEncodeBuffer);
// 设置完了就把编码消息清空
msgInner.setEncodedBuff(null);
// 设置返回结果
// (1) 消息设置的结果 AppendMessageStatus.PUT_OK
// (2) 写指针位置 wroteOffset
// (3) 消息长度 msgLen
// (4) msgIdSupplier 消息 ID 生成器
// (5) storeTimestamp 是消息存储时间
// (6) queueOffset 是消息在 ConsumeQueue 中存储的索引
// (7) pagecacheRT 是消息写入的耗费时间
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
    msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

在添加到 CommitLog 的 ByteBuffer 后(writeBuffer 或者 mappedByteBuffer),需要更新下 topicQueueTable,就是 ConsumeQueue 的队列最大物理偏移量。

// 看一下消息的事务类型
switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        // prepare 阶段或者 rollback 阶段类型的消息就直接退出
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        // 如果是普通消息或者是 commit 阶段的事务消息,那么更新 topicQueueTable 索引
        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        // 下面是 LMQ 相关队列的更新逻辑
        CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
        break;
    default:
        break;
}
return result;

好了,到这里追加消息的逻辑就解析完了,下面是这个方法的整体流程。

/**
 * 追加消息
 * @param fileFromOffset    文件起始索引
 * @param byteBuffer        CommitLog 的 ByteBuffer(没有开启读写分离就是 mappedByteBuffer,开启了就是 writeBuffer)
 * @param maxBlank          文件还有多少空位可以写入数据
 * @param msgInner          要写入的消息
 * @param putMessageContext 写入消息上下文
 * @return
 */
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // 写指针的位置,物理偏移量
    long wroteOffset = fileFromOffset + byteBuffer.position();

    // 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
    Supplier<String> msgIdSupplier = () -> {
        // 消息类型
        int sysflag = msgInner.getSysFlag();
        // 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16
        // 这个消息 ID 组成是: ip + port + wroteOffset
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        // 分配存储消息 ID 的 ByteBuffer,这里是一个 HeapByteBuffer
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        // 首先把 ip + port 设置进去,然后切换读模式
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        // 这里 clear 把 ByteBuffer 里面的属性重置了
        msgIdBuffer.clear();
        // 设置 wroteOffset
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        // 转成 16 进制字符串返回
        return UtilAll.bytes2string(msgIdBuffer.array());
    };

    // Record ConsumeQueue information
    // 记录 ConsumeQueue 的信息
    // key = topic-queueId
    String key = putMessageContext.getTopicQueueTableKey();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    // queueOffset 是 ConsumeQueue 中的最大偏移量,当 broker 写入一条消息,就会设置对应的 queueOffset++,
    // 所以实际的 ConsumeQueue 偏移量应该等于 queueOffset * 20
    if (null == queueOffset) {
        // 没有记录这个 key 就初始化为 0
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // LMQ 的处理,先不看
    boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
    if (!multiDispatchWrapResult) {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

    // 需要特殊处理的事务消息
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepare 阶段和 rollback 阶段的消息是不会设置到消费队列的
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        // 如果是普通消息和 commit 阶段的事务消息才会设置到消费队列
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    // 获取到之前编码过的 ByteBuffer
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    // 获取消息总长度
    final int msgLen = preEncodeBuffer.getInt(0);

    // 确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        // 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE
        // 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息
        this.msgStoreItemMemory.clear();
        // 1.存储消息总长度
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2.魔术设置为 BLANK_MAGIC_CODE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3.当前时间
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        // 把这部分数据设置到 byteBuffer 中
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
        // 返回结果,外层会对 END_OF_FILE 做特殊处理
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
                msgIdSupplier, msgInner.getStoreTimestamp(),
                queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // 下面要开始从第 6 个位置开始设置信息,所以 pos 从 20 开始
    int pos = 4 + 4 + 4 + 4 + 4;
    // 6.ConsumeQueue 中的偏移量,实际偏移量等于 queueOffset * 20
    preEncodeBuffer.putLong(pos, queueOffset);
    pos += 8;
    // 7.这条消息在 CommitLog 中的起始偏移量
    preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
    int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // 8.SYSFLAG, 9.BORNTIMESTAMP, 10.BORNHOST, 11.STORETIMESTAMP
    pos += 8 + 4 + 8 + ipLen;
    // 重新设置消息的存储时间,这里初始化的时候设置成 0,所以这里会设置
    preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());

    // 起始时间
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // 把消息写入 MappedFile 的 ByteBuffer,有可能是 mappedByteBuffer 或者 writeBuffer
    byteBuffer.put(preEncodeBuffer);
    // 设置完了就把编码消息清空
    msgInner.setEncodedBuff(null);
    // 设置返回结果
    // (1) 消息设置的结果 AppendMessageStatus.PUT_OK
    // (2) 写指针位置 wroteOffset
    // (3) 消息长度 msgLen
    // (4) msgIdSupplier 消息 ID 生成器
    // (5) storeTimestamp 是消息存储时间
    // (6) queueOffset 是消息在 ConsumeQueue 中存储的索引
    // (7) pagecacheRT 是消息写入的耗费时间
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    // 看一下消息的事务类型
    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            // prepare 阶段或者 rollback 阶段类型的消息就直接退出
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // 如果是普通消息或者是 commit 阶段的事务消息,那么更新 topicQueueTable 索引
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            // 下面是 LMQ 相关队列的更新逻辑
            CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
            break;
        default:
            break;
    }
    return result;
}

2.6 DefaultMessageStore#asyncPutMessage 结尾

上面 2.1 - 2.5 小结就是追加消息的逻辑,追加完消息之后,在入口 asyncPutMessage 方法的结尾会设置结果处理逻辑。

// 6.这里返回 putResultFuture,在上层会去通过 get 非法阻塞等待
putResultFuture.thenAccept((result) -> {
    // 当消息存储完成之后,lambda 表达式会被调用
    // 消息消耗的时间
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    // 设置下存储消息的消耗时间和最大消耗时间
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    // 这里就是判断下存储结果是不是成功的
    if (null == result || !result.isOk()) {
        // 这里就是存储失败了,新增存储失败的次数
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

return putResultFuture;

到这里存储单条消息的核心逻辑就解析完了,可以看到这里只是返回一个 CompletableFuture,这里并不会阻塞,但是上层方法调用这个方法之后通过 get 方法就可以阻塞等待了,比如还是 DefaultMessageStore 的 putMessage。

@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    return waitForPutResult(asyncPutMessage(msg));
}

在这里插入图片描述

3. 小结

这篇文章中我们解析了 broker 端存储单条消息的逻辑,但是除了单条消息,broker 还支持消息的批量新增,所以下一篇文章我们会解析消息的批量新增逻辑,下一篇文章字数会少一点,因为很多逻辑跟单条消息的新增是一样的。






如有错误,欢迎指出!!!


http://www.niftyadmin.cn/n/5838650.html

相关文章

解码,蓝桥杯2020G

a2b 解码后&#xff1a;aab ​ #include<iostream> using namespace std; typedef struct Node {char data;int size;Node* next; }Node,*Linklist; char* scan(char str[],int size) {int i 0;Linklist head new Node;Linklist rear head;while (i<size-1) {Lin…

嵌入式知识点总结 Linux驱动 (二)-uboot bootloader

针对于嵌入式软件杂乱的知识点总结起来&#xff0c;提供给读者学习复习对下述内容的强化。 目录 1.什么是bootloader&#xff1f; 2.Bootloader的两个阶段 3.uboot启动过程中做了哪些事&#xff1f; 4.uboot和内核kernel如何完成参数传递&#xff1f; 5.为什么要给内核传递…

docker安装nacos2.2.4详解(含:nacos容器启动参数、环境变量、常见问题整理)

一、镜像下载 1、在线下载 在一台能连外网的linux上执行docker镜像拉取命令 docker pull nacos:2.2.4 2、离线包下载 两种方式&#xff1a; 方式一&#xff1a; -&#xff09;在一台能连外网的linux上安装docker执行第一步的命令下载镜像 -&#xff09;导出 # 导出镜像到…

如何有效利用数据采集HTTP代理

数据采集HTTP代理是一种有效的工具&#xff0c;能够帮助用户在进行数据采集时提升效率。它通过将请求发送到目标服务器的方式&#xff0c;能够有效地隐藏用户的真实IP地址&#xff0c;从而实现更加安全和高效的数据获取。随着信息时代的发展&#xff0c;数据采集HTTP代理的应用…

自定义数据集使用scikit-learn中的包实现线性回归方法对其进行拟合

一、导入必要的库 import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error, r2_score 二、加载自定义数据集 # 创建自定义数据集 # 假设我们有一个…

Java基础知识总结(三十二)--API--- java.lang.Runtime

类中没有构造方法&#xff0c;不能创建对象。 但是有非静态方法。说明该类中应该定义好了对象&#xff0c;并可以通过一个static方法获取这个对象。用这个对象来调用非静态方法。这个方法就是 static Runtime getRuntime(); 这个Runtime其实使用单例设计模式进行设计。 class …

基于Springboot的社区药房管理系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业多年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…

Python NumPy(8):NumPy 位运算、NumPy 字符串函数

1 NumPy 位运算 位运算是一种在二进制数字的位级别上进行操作的一类运算&#xff0c;它们直接操作二进制数字的各个位&#xff0c;而不考虑数字的整体值。NumPy 提供了一系列位运算函数&#xff0c;允许对数组中的元素进行逐位操作&#xff0c;这些操作与 Python 的位运算符类似…