源码分析RocketMQ之Broker-消息存储

源码分析RocketMQ之Broker-消息存储
Broker接收到消息时,会将消息交给SendMessageProcessor处理器处理,SendMessageProcessor处理器首先解析发送消息
请求头,根据消息的类型交给不同方法处理消息。CommitLog类具体处理消息,首先获取最后一个MappedFile文件,
然后将消息以追加的方式写入到MappedFile中的byteBuffer缓存起来
1、checkStoreStatus:判断存储状态
  1、是否关机状态判断
  2、slave不能写入
  3、写入标识
  4、os是否繁忙判断,如果一次消息追加过程的时间超过了,Broker配置文件osPageCacheBusyTimeOutMills,则认为pageCache繁忙
2、消息是否合法:topic长度是否超过127 Properties 大小校验 32767
3、保存消息 commitLog.putMessage
  1、通过mappedFileQueue.getLastMappedFile获取写入映射文件
  2、获取追加锁,限制同一时间只能有一个线程进行数据的put工作
  3、mappedFile 判断是否null或者满了
  4、appendMessagesInner将消息追加到MappedFile的MappedByteBuffer/writeBuffer中,更新其写入位置wrotePosition,但还没Commit及Flush
     1、CommitLog.doAppend 追加写入文件
       1、PHY OFFSET 总的偏移量 = 文件开始偏移量+ 文件最新写入位置
       2、生成消息id
       3、ByteBuffer msgStoreItemMemory 设置TOTALSIZE 总大小 魔法code 在队列中的偏移量  bodyLength body
       4、CommitLog.this.topicQueueTable.put(key, ++queueOffset) 更新队列的下一个偏移量
4、统计保存消息花费时间
5、统计保存消息失败次数
ReputMessageService:线程异步的将CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
1、线程run方法执行doReput,判断如果reputFromOffset 小于 commitlog的最小offset,设置commitlog的最小offset
2、如果MaxOffset > reputFromOffset,且doNext循环
3、根据偏移量读取偏移量+到 commitlog文件中有效数据的最大偏移量。如果未找到数据,结束doReput 方法
4、找到消息,重置这个消息的开始offet
5、循环SelectMappedBufferResult 中读取消息,每次读取一条
6、从 SelectMappedBufferResult 中读取一条消息,生成 DispatchRequest 对象
7、doDispatch 根据commitlog 文件实时内容构建consumequeue、index文件
  1、循环dispatcherList有 CommitLogDispatcherBuildConsumeQueue CommitLogDispatcherBuildIndex MessageStore对象创建的时候初始化的 consumequeue index build
  2、CommitLogDispatcherBuildConsumeQueue:消费队列消息文件进度处理
    1、putMessagePositionInfo: 先根据topic和queueId查找ConsumeQueue消费队列文件,找到就返回,没有找到创建ConsumeQueue
    2、最大重试30次写入,获取是否可写标识,循环重试次数
    3、写入Consume queue 
      1、准备byteBufferIndex ByteBuffer
      2、expectLogicOffset:根据ConsumeQueueOffset 计算在ConsumeQueue 的位置
      3、获取最后一个MappedFile文件
      4、判断这个mappedFile是否第一个,且ConsumeQueueOffset 不是0 且mappedFile写位置是0
      5、设置最小offset、设置开始flush位置、设置开始commit
      6、currentLogicOffset:计算这个ConsumeQueue对应的offset
      7、如果插入expectLogicOffset比currentLogicOffset 小,说明之前已经有东西
      8、如果不相等队列可能出错
      9、设置最大物理offset
      10、通过mappedFile.appendMessage 追加消息内容
        1、获取当前位置
        2、判断这个MappedFile能否写入,当前位置+数据长度小于等于文件大小
        3、通过fileChannel.write写入消息,wrotePosition增加位置点数据长度
    4、更新LogicsMsg 消息存入时间点,用于恢复
  3、CommitLogDispatcherBuildIndex:索引文件
8、构建成功reputFromOffset进度+size,存储统计信息增加

 


版权声明:本文为liangshf520原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>