RocketMQ消息存储源码

RocketMQ的存储文件包括消息文件Commitlog、消息消费队列文件ConsumerQueueHash索引文件IndexFile监测点文件checkPointabort关闭异常文件。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。消息在CommitLog中的存储格式如下:

  • 4字节消息长度整个消息体所占用的字节数
  • 4字节魔数:固定值为MESSAGE_MAGIC_CODEBLANK_MAGIC_CODE
  • 4字节CRC消息体校验码,用于防止网络、硬件等故障导致数据与发送时不一样带来的问题
  • 4字节queueId:表示消息发到了哪个MessageQueue
  • 4字节flag:flag是创建Message对象时由生产者通过构造器设定的flag值
  • 8字节queueOffset:表示queue中的偏移量
  • 8字节physicalPosition:表示在存储文件中的偏移量
  • 4字节sysFlag:是生产者相关的信息标识
  • 8字节消息创建时间、8字节消息生产者的host、8字节消息存储时间、8字节消息存储的机器的host
  • 4字节表示重复消费次数8字节消息事务相关偏移量
  • 4字节表示消息体的长度N字节消息休,不是固定长度,和前面的4字节的消息体长度值相等
  • 1字节表示topic长度,因此topic的长度最多不能超过127个字节,N字节存储topic,不是固定长度
  • 2字节properties的长度,properties是创建消息时添加到消息中,所占字节数不能超过2^15-1,N字节存储Properties的内容

RocketMQ组织文件以文件起始偏移量来命令文件,根据偏移量能快速定位到真实物理文件基于内存映射文件机制提供了同步刷盘异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

为了保证消息发送的高吞吐量采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为了方便消息消费构建了消息消费队列文件,基于主题队列进行组织,同时为消息实现了Hash索引,可以为消息设置索引键,故能快速从CommitLog文件中检索消息。

当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息在CommitLog文件的Offset4字节的消息长度8字节的的Tag的哈希码转发给消息消费队列文件索引文件。为了安全起见引入abort文件,记录Broker停机是否是正常关闭,在重启Broker时为了保证CommitLog文件、消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件

ConsumerQueue消息队列引入的目的是提高消费的性能,每个MessageQueue都会有对应的ConsumerQueue文件存储在磁盘上,每个ConsumerQueue文件包含30W条消息每条消息size大小为20字节包含8字节CommitLogOffset4字节消息长度8字节Tag的哈希值,故每个ConsumerQueue文件大小约为5.72M

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足默认4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费

DefaultMessageStore构造方法中,会传入消息拉取长轮询模式消息达到监听器MessageArrivingListener,初始化统一异步的方式用于提前创建MappedFile和下一个MappedFileAllocateMappedFileService,实例化用于存储消息的CommitLog,初始化消息队列异步刷盘线程FlushConsumeQueueService、清除过期的CommitLog线程CleanCommitLogService,异步清楚过期的ConsumeQueue和Index文件线程CleanConsumeQueueService,实例化消息索引服务IndexService,实例化主从同步服务HAService,实例化消费分发线程ReputMessageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class DefaultMessageStore implements MessageStore {
private final MessageStoreConfig messageStoreConfig; // 消息配置属性
private final CommitLog commitLog; //K2 CommitLog
// 消息队列存储缓存
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
private final FlushConsumeQueueService flushConsumeQueueService; // 消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService; // 清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService; // 清除ConsumeQueue文件服务
private final IndexService indexService; // 索引实现类
private final AllocateMappedFileService allocateMappedFileService; // MappedFile分配服务
// commitLog消息分发,根据CommitLog文件来构建ConsumeQueue和indexFile文件。
private final ReputMessageService reputMessageService;
private final HAService haService; // 存储HA机制
private final ScheduleMessageService scheduleMessageService; // 消息服务调度线程
private final StoreStatsService storeStatsService; // 消息存储统计服务
private final TransientStorePool transientStorePool; // 消息堆外内存缓存
private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager; // Broker状态管理器
private final MessageArrivingListener messageArrivingListener; // 消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig; // Broker配置类
private volatile boolean shutdown = true;
private StoreCheckpoint storeCheckpoint; // 文件刷盘监测点
private AtomicLong printTimes = new AtomicLong(0);
private final LinkedList<CommitLogDispatcher> dispatcherList; // CommitLog文件转发请求
private RandomAccessFile lockFile;
private FileLock lock;
boolean shutDownNormal = false;
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// 在通过ReputMessageService线程做消息向Queue和Index文件分发时调用,用于响应消费者的拉取小区请求,从而达到准实时消息推送的效果
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
// 统一异步的方式用于提前创建一个MappedFile和下一个MappedFile
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService(); // 消息队列异步刷盘线程
this.cleanCommitLogService = new CleanCommitLogService(); // CommitLog异步清楚过期消息线程
this.cleanConsumeQueueService = new CleanConsumeQueueService(); // ConsumeQueue异步清楚过期消息线程
this.storeStatsService = new StoreStatsService(); // 存储状态统计服务
this.indexService = new IndexService(this); // 消息索引服务
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this); // 主从同步服务
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService(); // 消费分发线程
this.scheduleMessageService = new ScheduleMessageService(this); // 延迟队列处理服务
this.transientStorePool = new TransientStorePool(messageStoreConfig); // 使用对外内存
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init(); // 将进程使用的部分地址空间锁定在物理内存中
}
this.allocateMappedFileService.start(); // 启动MappedFile分配服务
this.indexService.start(); // 启动消息索引服务
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 添加ConsumeQueue分发Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 添加Index分发Dispatcher
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent()); // 确认文件目录存在,若不存在则创建
lockFile = new RandomAccessFile(file, "rw");
}
public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
}
// Broker启动时会启动一个线程来更新ConsumerQueue索引文件。
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
}
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
this.addScheduleTask(); // Broker启动删除过期文件的定时任务
this.shutdown = false;
}
}

CommitLog中通过asyncPutMessageputMessage方法向MappedFile中写数据时,首先调用MappedFileQueue无参的getLastMappedFile方法获取最后一个MappedFile,若MappedFile文件已经写满未获取到,则调用MappedFileQueue带参数的getLastMappedFile,最终调用AllocateMappedFileServiceputRequestAndReturnMappedFile方法将创建请求提交到优先队列requestQueue中,且会创建当前的MappedFile和下一个MappedFile两个请求,然后通过构建的AllocateRequest的CountDownLatch等待文件创建完成,默认超时时间5s

run方法中mmapOperation方法从优先队列头部取出分配请求AllocateRequest,如果开启了堆外内存,则通过MappedFileinit方法实例化MappedFile,且从TransientStorePool中获取将地址空间锁定在物理内存中的ByteBuffer,若未开启堆外内存则通过够着方法创建MappedFile,若开启了文件预热,还会调用warmMappedFileMappedByteBuffer每隔4K就写入0byte,将整个文件撑满;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
public class MappedFileQueue {
// 获取最后一个文件的映射对象(MappedFile), 如果最后一个文件已经写满了,重新创建一个新文件
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {// 获取MappedFileQueue中的最后一个MappedFile类实例
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {} catch (Exception e) {
break;
}
}
return mappedFileLast;
}
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile(); // 获取MappedFile文件队列中最后一个文件,因为是按照顺序写的
if (mappedFileLast == null) { // 说明需要创建MappedFile,计算创建文件的起始offset即文件名
// 若当前MappedFileQueue为空,则要创建的文件的起始offset为不大于startOffset的最大能被mappedFileSize整除的数
// 如[0,99),[100,199),[200,299)队列梯度这样的,若需要拉取230偏移量所在的队列起始队列,那么就是230 - (230 % 100)= 200
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) { // 若文件满了也需要创建MappedFile文件,计算创建文件的起始offset即文件名
// 如果最后一个队列满了,则从最后一个队列的起始偏移位置 + 队列长度作为下一个队列的起始偏移位置
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) { // 若创建文件的起始offset不为-1,其需要创建MapperFile文件
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) { // 若MappedFile是第一个被创建的,会加上标识,这个标识在put消息的时候会使用到
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile); // 将创建的MappedFile加入队列中
}
return mappedFile;
}
return mappedFileLast;
}
}
public class AllocateMappedFileService extends ServiceThread {
private static int waitTimeOut = 1000 * 5; // 等待创建MapperFile的超时时间,默认5s
// 用来保存当前所有待处理的分配请求,key为filePath,value是分配请求AllocateRequest,若分配请求被成功处理,即获取到映射文件则从请求会从requestTable中移除
private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<String, AllocateRequest>();
private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<AllocateRequest>();
private volatile boolean hasException = false; // 创建MappedFile是否有异常
private DefaultMessageStore messageStore;
// 两个核心方法putRequestAndReturnMappedFile和mmapOperation,两个方法配合实现MappedFile文件的创建和预热MappedFile
public void run() { // 此线程在DefaultMessageStore创建时启动
while (!this.isStopped() && this.mmapOperation()) {
}
}
private boolean mmapOperation() { // 初始化MappedFile,且预热MappedFile
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take(); // 从优先队列头部取出分配请求AllocateRequest
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) { // 该请求失效,可能是因为超时
return true;
}
// putRequestAndReturnMappedFile里map与优先级队列并不是强一致,是最终一致的
if (expectedRequest != req) {
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 允许堆外内存
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) { // 创建MappedFile,若花费大于10ms打印日志
int queueSize = this.requestQueue.size();
}
// pre write mappedFile,默认warmMapedFileEnable=false,即默认不预热
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog() && this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
this.hasException = true;
return false;
} catch (IOException e) {
this.hasException = true;
if (null != req) { // 重新插入请求到队列
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally { // AllocateRequest计数器减一,表示MappedFile已经创建完成
if (req != null && isSuccess) {
req.getCountDownLatch().countDown();
}
}
return true;
}
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2; // 当前能处理的请求数,TransientStorePool中默认为5个
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 快速失败策略开启,且不是从节点时,TransientStorePool中调用c#mlock,将进程使用的部分地址空间锁定在物理内存中
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 创建一个AllocateRequest,并放在待处理的缓存中,处理成功后会从缓存中移除
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); // 当前要创建的文件
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; // 查看是否已经存在
if (nextPutOK) { // 若不存在
if (canSubmitRequests <= 0) { // TransientStorePool不足,不能创建,直接返回null
this.requestTable.remove(nextFilePath);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq); // 将请求添加到优先队列
canSubmitRequests--;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); // 下一个要创建的文件
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) { // TransientStorePool不足,不能创建,直接返回nullthis.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq); // 将请求添加到优先队列
}
}
// 若在CountDownLatch#await前已经有异常,hasException使用volatile修饰,具备可见性,表示mmapOperation已经执行完成,此时直接返回null
if (hasException) {
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 执行CountDownLatch#await,默认等待5s,若没执行成功直接返回null,但不移除requestTable下次可以直接到wait这里
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) { // 超时直接返回null,此时不移除requestTable,下次可直接直接到wait这里,上面的缓存put无需再次执行
return null;
} else { // 若执行成功,移除requestTable,直接返回创建好的MappedFile
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
}
} catch (InterruptedException e) {}
return null;
}
}
public class MappedFile extends ReferenceResource {
public static final int OS_PAGE_SIZE = 1024 * 4; // 系统Page缓存页4k
// 当前JVM实例中MappedFile虚拟内存总和
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 若未使用占存池TransientStorePool,wrotePosition表示写入内存位置,同时这个位置是数据可读的最大位置;
// wrotePosition表示写入暂存池位置,这部分数据是不可读的;committedPosition表示提交到fileChannel的位置,同时这个位置是数据可读的最大位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 当前文件的写指针位置
protected final AtomicInteger committedPosition = new AtomicInteger(0); // 当前文件的提交位置
private final AtomicInteger flushedPosition = new AtomicInteger(0); // 刷新到磁盘指针位置
protected int fileSize; // 文件大小
protected FileChannel fileChannel; // 文件通道
protected ByteBuffer writeBuffer = null; // 堆外内存ByteBuffer
// 暂存池TransientStorePool只在异步刷盘的模式下,并且配置了MessageStoreConfig.transientStorePoolEnable的时候才会开启
// 如果启用,只会向writeBuffer写入数据,否则只会向mappedByteBuffer写入数据,不会同时写入
// 只有主Broker、刷盘方式为异步刷盘且transientStorePoolEnable为true才会启用暂存池TransientStorePool
protected TransientStorePool transientStorePool = null; // 堆外内存池,维护了一系列的堆外内存,通过将消息写到堆外内存中来提高性能
private String fileName; // 文件名称
private long fileFromOffset; // 该文件的处理偏移量
private File file; // 物理文件
private MappedByteBuffer mappedByteBuffer; // 物理文件对应的内存映射Buffer
private volatile long storeTimestamp = 0; // 文件最后一次内容写入时间
private boolean firstCreateInQueue = false;// 是否是MappedFileQueue队列中第一个文件
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
// 未开启堆外内存
public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
// 如果打开了堆外内存,会把数据先写入堆外内存,然后commit后,再写入硬盘
// 是否开启堆外内存的判断见: MessageStoreConfig.isTransientStorePoolEnable配置参数transientStorePoolEnable,并且开启异步刷盘
public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
// 这个就是不适用堆外内存
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName()); // 文件名为起始offset
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// mmap内存映射文件,模式是READ_WRITE,如果文件不存在,就会被创建
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); // 当前JVM中MappedFile虚拟内存总和
TOTAL_MAPPED_FILES.incrementAndGet(); // MappedFile对象个数加一
ok = true;
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
// 将MappedByteBuffer每隔4K就写入0byte,将整个文件撑满;若刷盘策略是同步刷盘,还需要调用force,该操作是相当相当耗时,故需要进行异步处理
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
}
if (type == FlushDiskType.SYNC_FLUSH) {
mappedByteBuffer.force();
}
this.mlock();
}
}
public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize(); // 默认为5
this.fileSize = storeConfig.getMappedFileSizeCommitLog(); // CommitLog文件大小默认为1G
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// Linux下实际调用c.mlock,将进程使用的部分地址空间锁定在物理内存中
// 被锁定的物理内存在被解锁或进程退出前,不会被页回收流程处理
// 被锁定的物理内存,不会被交换到swap设备
// 进程执行mlock操作时,内核会立刻分配物理内存
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); // NativeLong是将java的Long类型映射到C的Long类型上传值
availableBuffers.offer(byteBuffer);
}
}
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
return buffer;
}
}

实例化CommitLog时首先实例化MappedFileQueue,然后若是同步刷盘则实例化GroupCommitService,若是异步刷盘则实例化FlushRealTimeService,且实例化负责消息提交的CommitRealTimeService,这三个类都是FlushCommitLogService的子类;其实例化将消息在文件末尾不断的appendDefaultAppendMessageCallback回调;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class CommitLog {
public final static int MESSAGE_MAGIC_CODE = -626843481; // 单条消息的魔数
protected final static int BLANK_MAGIC_CODE = -875286124; // 代表文件结尾
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService; // 负责消息刷盘
private final FlushCommitLogService commitLogService; // 负责消息提交
private final AppendMessageCallback appendMessageCallback;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService(); // 同步刷盘
} else {
this.flushCommitLogService = new FlushRealTimeService(); // 异步刷盘
}
this.commitLogService = new CommitRealTimeService(); // 负责消息提交
// append消息回调(描述的是将消息在文件末尾不断的append上去)
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
public boolean load() { // 通过调用MappedFileQueue的load方法加载加载CommitLog目录下的文件
boolean result = this.mappedFileQueue.load();
return result;
}
public void start() {
this.flushCommitLogService.start(); // 开启刷盘线程
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start(); // 如果允许对外内存,则开启CommitLog提交线程
}
}
}

刷盘

通过CommitLog中的putMessage中调用handleDiskFlush或在asyncPutMessage中调用submitFlushRequest提交同步刷盘请求GroupCommitService线程的队列中且唤醒该线程,若是异步刷盘,若开启了堆外缓存池且是异步刷盘且不是从节点的情况下唤醒FlushRealTimeService异步刷盘线程,否则唤醒CommitRealTimeService异步刷盘线程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class CommitLog { // CommitLog底层存储是基于MappedFileQueu实现的
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // Synchronization flush 同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//构建一个GroupCommitRequest,交给GroupCommitService处理。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try { //同步等待文件刷新
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
} else { // Asynchronous flush 异步刷盘,异步刷盘是把消息映射到MappedFile后,单独唤醒一个服务来进行刷盘
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup(); // 未开启堆外内存
} else {
commitLogService.wakeup(); // 开启了堆外内存
}
}
}
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
} else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup(); // 未开启堆外内存
} else { //这里可以看到,如果打开了堆外内存,就需要先将对外内存写入到文件映射中,再存盘
commitLogService.wakeup(); // 开启了堆外内存
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
}
同步刷盘

同步刷盘并非真正的同步,而是通过GroupCommitService异步线程每10ms执行一次刷盘,其该线程中实现了读写分离,当系统发起刷盘请求时不会影响系统继续写入刷盘请求,且在完成一次刷盘之后即可进行读写队列互换身份,加了同步锁,继续读写;通过在超类ServiceThread中调用onWaitEnd方法中调用swapRequests来完成读写队列的交换;且这里刷盘传入的flushLeastPages0即只要有数据没有被刷到磁盘都会执行刷盘;且若正常情况下shutdown,还会等待10ms请求到来,然后再执行一次队列交换和flush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class GroupCommitService extends FlushCommitLogService {
// 写GroupCommitRequest队列则相对于系统的刷盘请求写入
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
// 读GroupCommitRequest队列是相对于刷盘实例即this对象
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
this.wakeup();
}

private void swapRequests() { // 每10ms互换一次队列
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() { // 最终执行同步刷盘策略的地方
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// 下一个文件可能有消息,所以最多两次flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { // 当前索引位置小于请求数据的位置执行刷盘
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else { // 由于个别消息设置为不同步刷新,所以会来到这个过程
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
public void run() {
while (!this.isStopped()) {
try { // 且每10ms进行一次刷盘,且每完成一次交换读写队列
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {}
}
try { // 正常情况下shutdown,等待请求到来,然后flush
Thread.sleep(10);
} catch (InterruptedException e) {}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
public class MappedFileQueue {
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 通过刷盘的offset,从mappedFiles列表中找出offset具体所在的MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 文件最后一次内容写入时间
int offset = mappedFile.flush(flushLeastPages); // 刷盘,得到新的刷到的位置,相对该mappedFile的位置
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere; // 判断是否刷盘成功
this.flushedWhere = where;
if (0 == flushLeastPages) { // 若没有内容可刷新
this.storeTimestamp = tmpTimeStamp; // 更新文件最后一次内容写入时间
}
}
return result;
}
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) { // 达到刷盘条件
if (this.hold()) {
int value = getReadPosition();
try {// 这个force方法就是强迫把你写入内存的数据刷入到磁盘文件里去
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false); // 若使用堆外内存的情况
} else {
this.mappedByteBuffer.force(); // 未使用堆外内存的情况
}
} catch (Throwable e) {}
this.flushedPosition.set(value);
this.release();
} else {
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();// 上次刷盘的位置,offset
int write = getReadPosition();// 当前写入位置,offset
if (this.isFull()) { // 若文件已满,则返回true
return true;
}
if (flushLeastPages > 0) {// 未刷盘页数大于指定页数
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush; // 有未刷盘的数据则刷盘
}
}
异步刷盘

每隔500ms发起一次将文件映射写入到硬盘,注意这里传入的默认刷盘页数为4,即当需要刷盘的数据不足4Page16K时也不会真正执行刷盘任务,且若当时间超过10s将刷盘页数置为0即只要当有数据没有被刷到磁盘也会执行一次刷盘,防止消息比较少的情况,数据一直不能被刷到磁盘中;且若是正常关机,确保退出前所有将数据刷到磁盘,还会执行一次刷盘,若不成功会重试10次;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0;
public void run() {
while (!this.isStopped()) {
// 是否实时刷盘,默认是false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// interval默认500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 默认刷盘页数,默认4
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 物理队列刷盘吞吐时间间隔,默认10s
int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
long currentTimeMillis = System.currentTimeMillis();
// 若当前系统时间大于上次刷盘时间+物理队列刷盘吞吐时间间隔,理论上来说,默认时间500ms,即currentTimeMillis+500ms不太可能大于上次currentTimeMillis+10s
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
// 刷盘是阻塞的,若一次刷盘时间过程,则会将刷盘的页数改为0
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try { // 但如果当前写入位置和上次刷盘位置不到4个page页,即16K,不会真正执行刷盘
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); // 正常情况flushPhysicQueueLeastPages为4
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
} catch (Throwable e) {
this.printFlushProgress();
}
}
// 正常关机,确保退出前所有flush
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
}
this.printFlushProgress();
}
@Override
public String getServiceName() {
return FlushRealTimeService.class.getSimpleName();
}
private void printFlushProgress() {
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}

CommitRealTimeService负责将堆外内存数据写入到文件映射中,每200ms执行一次MappedFileQueuecommit方法,若写入位置大于提交位置则将数据从堆外内存通过FileChannel写入文件映射中;且若写入成功唤醒FlushCommitLogService线程执行异步刷盘,若是正常退出同样会调用MappedFileQueuecommit方法提交所有数据;

可以通过commitCommitLogThoroughInterval配置设置提交周期,比如和FlushCommitLogService线程一样设置成10s,则逻辑和FlushCommitLogService中就很相似了,若200ms执行时数据超过4Page页或时间超过10s则执行一次commit操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class CommitRealTimeService extends FlushCommitLogService {// 负责将堆外内存数据写入到文件映射中
private long lastCommitTimestamp = 0;
@Override
public String getServiceName() {
return CommitRealTimeService.class.getSimpleName();
}
// 每隔一定时间执行一次刷盘,最大间隔是10S
@Override
public void run() {
while (!this.isStopped()) {
// interval默认200
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// commitDataLeastPages默认为4
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// commitDataThoroughInterval默认为200
int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) { // result等于false说明提交刷盘成功
this.lastCommitTimestamp = end; // result = false means some data committed.
flushCommitLogService.wakeup(); // 唤醒FlushCommitLogService
}
this.waitForRunning(interval); // 等待200ms
} catch (Throwable e) {}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
}
}
}
public class MappedFileQueue {
public boolean commit(final int commitLeastPages) {
boolean result = true;
// 通过Commit的offset,从mappedFiles列表中找出offset具体所在的MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages); //执行commit之后,mappedFile记录的相对commit的位置
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere; // false才代表commit执行了
this.committedWhere = where;
}
return result;
}
}
public class MappedFile extends ReferenceResource {
public int commit(final int commitLeastPages) { // 提交暂存池数据到fileChannel
if (writeBuffer == null) { // 没有开启堆外内存,无需将数据提交到文件通道,因此只需将 writePosition 视为committedPosition
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) { // 达到commit条件
if (this.hold()) { // 相当于同步锁,只是通过AtomicLong实现
commit0(commitLeastPages);
this.release();
}
}
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer); // 回收ByteBuffer,将其重新放入缓存池中
this.writeBuffer = null;
}
return this.committedPosition.get();
}
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {}
}
}
}

消息存储

对于消息的存储是通过DefaultMessageStoreputMessage方法最终调用CommitLogputMessage方法从而使用mmap零拷贝来完成数据的存储。对于延迟消息会将实际的Topic替换为SCHEDULE_TOPIC_XXXX,通过异步任务当到达时间点时再从该队列中取出再放入原来实际的Topic

对于CommitLog文件的写同一时间只能有一个线程,首先会获取当前要写的MappedFile,若该文件已满或不存在则创建一个MappedFile,通过MappedFileappendMessage方法将数据写入缓存中。需要特别注意的是在调用MappedFileappendMessage之前使用了锁,且该锁用户可以配置,可以使用ReentrantLock也可以使用CAS;然后执行handleDiskFlush方法将缓存中的数据刷到磁盘中,以及通过handleHA方法进行主从同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public interface MessageStore {
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(putMessage(msg));
}
}
public class DefaultMessageStore implements MessageStore {
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
//我们跟踪下这个最典型的消息写入commitlog的方法
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
}
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
msg.setStoreTimestamp(System.currentTimeMillis()); // Set the storage time
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (msg.getDelayTimeLevel() > 0) { // Delay Delivery
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 将消息的Topic替换为SCHEDULE_TOPIC_XXXX
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //mappedFile 零拷贝实现
putMessageLock.lock(); // 线程锁,注意使用锁的这种方式
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 直接以Append的方式写入文件
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) { // 文件写入的结果
case PUT_OK:
break;
case END_OF_FILE: //文件写满了,就创建一个新文件,重写消息
unlockMappedFile = mappedFile;
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg); //文件刷盘
handleHA(result, putMessageResult, msg); //主从同步
return putMessageResult;
}
}

消息是通过MappedFileappendMessage方法以Append的方式写入文件缓存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class MappedFile extends ReferenceResource {
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
public class CommitLog {
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBatch messageExtBatch) {
byteBuffer.mark();
long wroteOffset = fileFromOffset + byteBuffer.position(); //physical offset
keyBuilder.setLength(0); // Record ConsumeQueue information
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
msgIdBuilder.setLength(0);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
int sysFlag = messageExtBatch.getSysFlag();
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder);
messagesByteBuff.mark();
while (messagesByteBuff.hasRemaining()) {
final int msgPos = messagesByteBuff.position(); // 1 TOTALSIZE
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
if (msgLen > this.maxMessageSize) { // Exceeds the maximum message
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { // Determines whether there is sufficient free space
this.resetByteBuffer(this.msgStoreItemMemory, 8);
this.msgStoreItemMemory.putInt(maxBlank); // 1 TOTALSIZE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE
messagesByteBuff.reset(); // 3 The remaining space may be any value ignore previous read
byteBuffer.reset(); //Here the length of the specially set maxBlank ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
messagesByteBuff.position(msgPos + 20); //move to add queue offset and commitlog offset
messagesByteBuff.putLong(queueOffset);
messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
storeHostBytes.rewind();
String msgId;
if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
}
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
}

消息分发

CommitLog写入一条消息后,会有一个后台线程ReputMessageService1ms就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueueIndexFile中。若服务异常宕机,会造成CommitLogConsumeQueueIndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStoreload方法提供了恢复索引文件的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ReputMessageService extends ServiceThread {
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { // 异步构建ConsumeQueue、Index服务线程是否停止,一直调用doReput()方法,推送一次构建服务,线程休息1毫秒
try { // 每隔1毫秒,往ConsumeQueue和IndexFile中转发一次CommitLog写入的消息
Thread.sleep(1);
this.doReput();// 进行消息ConsumeQueue、Index文件异步构建
} catch (Exception e) {}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
private void doReput() { // 没有需要构建的offset时会停止
// reputFromOffset小于commitlog中mappedFile文件开始的offset,进行reputFromOffset值调整为mappedFile文件的开始offset
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// 循环构建commitlog文件剩余需要分发的offset,若分发起始offset即reputFromOffset小于commitLog的最大Offset,说明有需要分发的消息,否则退出该方法
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break; // 若duplicationEnable为true,且分发的其实offset大于等于confirmOffset则直接退出
}
// 获取CommitLog中从reputFromOffset开始的所有可读消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) { // 若未获取到任务消息,说明不需要再分发,退出循环
try {
this.reputFromOffset = result.getStartOffset(); // 这里其实就是reputFromOffset本身
for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 遍历查询到的消息
//从CommitLog中获取一个DispatchRequest,拿到一份需要进行转发的消息,也就是从commitlog中读取的,注意checkMessageAndReturnSize方法中包含延迟消息处理逻辑
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) { // 若消息校验成功,即魔数合法、消息体CRC校验合法
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest); //分发CommitLog写入消息
// 长轮询:如果有消息到了主节点,并且开启了长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size; // 重置reputFromOffset
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { // 从节点,消息统计
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) { // 若分发夸文件了,则计算下一个reputFromOffset,重新获取构建的offset偏移量
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
// 构建失败,这条数据略过,进行构建位置更新,进行下一条ConsumeQueue条目的构建
if (size > 0) {
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {// 获得需要构建的数据的释放
result.release();
}
} else {
doNext = false;
}
}
}
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req); // 将commitLog写入的事件转发到ComsumeQueue和IndexFile
}
}
public long behind() {
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
}

对于ConsumeQueue文件的写入是通过调用CommitLogDispatcherBuildConsumeQueuedispatch方法,在分发时首先通过DefaultMessageStorefindConsumeQueue方法按消息TopicqueueId找到对应的消息队列,然后调用ConsumeQueueputMessagePositionInfoWrapper正则去完成写入到对应的缓存以及刷盘工作。最终调用的MappedFileappendMessage来完成数据的持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) { // Consumequeue文件分发的构建器
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
public class DefaultMessageStore implements MessageStore {
private final ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable;
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
public ConsumeQueue findConsumeQueue(String topic, int queueId) { // 获取具体的队列
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
}
public class ConsumeQueue {
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
}
}
// ConsumeQueue数据分发
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
return true;
}
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
return true;
}
}
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
}

对于Index文件的分发是通过CommitLogDispatcherBuildIndex中最终调用IndexServicebuildIndex方法完成的,首先先通过retryGetAndCreateIndexFile获取IndexFile,若不存在则创建,让后通过IndexFileputKey方法将数据写入磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
public class IndexService {
public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) { //重复消息直接返回
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return; // 回退消息直接返回
}
// indexFile索引文件构建的核心步骤
if (req.getUniqKey() != null) { // 若存在唯一键
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
return;
}
}
if (keys != null && keys.length() > 0) { // 若存在过滤的keys
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
return;
}
}
}
}
}
}
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
}
return indexFile;
}
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
if (indexFile == null) {
try { // 按照时间生成Index文件名称
String fileName = this.storePath + File.separator + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
} finally {
this.readWriteLock.writeLock().unlock();
}
if (indexFile != null) { // 开启线程将之前的IndexFile缓存中的数据刷到磁盘中
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
}
public class IndexFile {
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
} finally {
if (fileLock != null) {
try {
fileLock.release();
}
}
}
}
return false;
}
}