RocketMQ消费者源码

消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。消费模式有推模式拉模式推模式是由拉模式封装组成

PUSH模式

PullMessageService线程的run方法中消费PullRequest请求,最终调用DefaultMQPushConsumerImplpullMessage方法,首先会对消息进行流量消息大小限流,若不满足限流条件丢到线程池中延迟处理,定义了一个拉取消息的回调函数PullCallback,若拉取消息失败则调用onException将其丢到线程池中延迟处理下次继续重试处理,若成功则调用onSuccess方法,在该方法中若拉取到数据后会调用executePullRequestLaterexecutePullRequestImmediately方法再次将拉取请求放入任务队列中,若有数据则会一直拉取直到数据被消费完则PullStatus会变为非FOUND状态。不论获取消息成功还是失败都会将请求再次放回队列,便于长轮训的方式拉取消息

PullRequest拉取消息并非是只拉取一条,而是拉取小于等于32的一批消息,若是集群模式还会顺便将当前请求的MessageQueue的消费进度offset上报给你Broker;且客户端与服务端建立的是长连接,若没有拉取到消息Broker并不会立即返回内容;

若拉取到消息,首先通过ProcessQueueputMessage方法将消息放到msgTreeMap中缓存,且判断msgTreeMap不为空当前消费状态为false将当前消费状态置为true,返回true表明顺序消费模式可以消费,然后通过调用ConsumeMessageService具体实现的submitConsumeRequest方法,从而完成消息额异步消费;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
public class PullMessageService extends ServiceThread {
public void run() {
while (!this.isStopped()) {
try { //拉取消息的请求队列
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest); //处理请求
}
}
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest); // 推模式的消费者最终还是会使用拉消息的方式
}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) { // PUSH模式拉取消息的核心流程
final ProcessQueue processQueue = pullRequest.getProcessQueue(); //获取要处理的消息:ProcessQueue
if (processQueue.isDropped()) { // 若队列发生变更,如负载均衡重置了订阅的队列,导致队列被抛弃,直接返回
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 先更新时间戳
try {
this.makeSureStateOK();
} catch (MQClientException e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
if (this.isPause()) { //如果处理队列被挂起,延迟1S后再执行。
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get(); //获得最大待处理消息数量
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); //获得最大待处理消息大小

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { //从数量进行流控
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { //从消息大小进行流控
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
if (!this.consumeOrderly) { // 若不是有序消息
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
} else { // 顺序消息的处理逻辑
if (processQueue.isLocked()) { // 若队列被锁定
if (!pullRequest.isLockedFirst()) { // 若pullRequest没有被锁定
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else { // 若队列没有被锁定
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); // 延迟3s
return;
}
}
// 获取订阅信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() { // 客户端默认的拉取的回调函数,在拉取到消息后会进入这个方法处理。
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 解析消息结果,以及Tag过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND: // 若拉取到了消息
long prevRequestOffset = pullRequest.getNextOffset(); // 上一次拉取消息的offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 下一次拉取消息的起始offset
long pullRT = System.currentTimeMillis() - beginTimestamp; // 拉取耗时
// 拉取请求RT统计
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { // 若未拉取到消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 将拉取请求放回阻塞队列
} else {
// 拉取请求TPS统计
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 将拉取到的消息放到ProcessQueue的msgTreeMap中缓存
// 消费者消息服务处理消费到的消息,这里也是顺序消息和并发消息的区别处理的地方
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { // PUSH模式下任务间隔时间,将拉取请求延迟间隔时间后返回阻塞队列
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 将拉取请求放回阻塞队列
}
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 设置下一次拉取消息的其实offset
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); // 更新当前队列的消费进度
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 将请求丢回阻塞队列
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 设置下一次拉取消息的其实offset
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); // 更新当前队列的消费进度
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); // 将请求丢回阻塞队列
break;
case OFFSET_ILLEGAL:
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 设置下一次拉取消息的其实offset
pullRequest.getProcessQueue().setDropped(true); // 删除消息队列镜像
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() { // 10s后执行该任务,保存消费进度,持久化,移除消息队列镜像
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) { // 延迟3s丢回阻塞队列
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { // 集群模式,顺便将本地消费进度offset提交给Broker
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) { // 读取本地缓存中的消费进度offset
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, true, subExpression != null, classFilter);
try {// 客户端实际与服务器交互,拉取消息的地方,拉取成功后回调PullCallback,批量拉取默认小于等于32条
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback);
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); // 读写锁:堆笑的操作都要使用
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); // 临时存放消息
private final AtomicLong msgCount = new AtomicLong(); // 消息总数据量
private final AtomicLong msgSize = new AtomicLong(); // 整个ProcessQueue处理单元的总消息长度
private final Lock lockConsume = new ReentrantLock();
// 一个临时的TreeMap,仅在顺序消费模式下使用
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L; // 整个ProcessQueue处理单元的offset最大边界
private volatile boolean dropped = false; // 是否被删除
private volatile long lastPullTimestamp = System.currentTimeMillis(); // 最后拉取时间
private volatile long lastConsumeTimestamp = System.currentTimeMillis(); // 最后消费时间
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
private volatile boolean consuming = false; // 是否正在消费
private volatile long msgAccCnt = 0; // 拉取消息的那一刻broker端还有多少条消息没有被处理
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false; // 这个只有在顺序消费的时候才会遇到,并发消费不会用到
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;//有效消息数量
for (MessageExt msg : msgs) { // 把传过来的消息都都放在msgTreeMap中,以消息在queue中的offset作为key,msg做为value
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) { // 正常情况,说明原本msgTreeMap中不包含此条消息
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt); // 增加有效消息数量
// msgTreeMap不为空(含有消息),并且不是正在消费状态,这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true; // 有消息,且为未消费状态,则顺序消费模式可以消费
this.consuming = true; // 将ProcessQueue置为正在被消费状态
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1); // 拿到最后一条消息
// 获取broker端(拉取消息时)queue里最大的offset,maxOffset会存在每条消息里
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) { // 计算broker端还有多少条消息没有被消费
// broker端的最大偏移量 - 当前ProcessQueue中处理的最大消息偏移量
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {}
return dispatchToConsume;
}
}

在拉取消息成功PullCallback回调方法中,并发消息顺序消息分别调用ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyServicesubmitConsumeRequest方法。最终通过各自的内部类ConsumeRequest线程来处理,在该线程中具体消费者的consumeMessage方法。

对于并发消息,若设置批量消费设置大于当前消息数会对消息进行分页消费,将消费逻辑封装到ConsumeRequest线程中,并提交到线程池中处理,若提交被线程池拒绝策略拒绝会延迟5s再次提交,ConsumeRequest中调用用户自定义MessageListener消费消息事,若MessageListener返回null抛异常,最终会通过ConsumeMessageConcurrentlyServiceprocessConsumeResult方法,尝试将消费失败的消息发回Broker,然后重新消费,若发送失败,则会延迟5s后再次尝试消费发回失败的消息

对于顺序消息,通过传入的dispathToConsume判断当前是否可以消费消息,如能消费消息才将消费任务ConsumeRequest提交个线程池处理,注意这里和并发消息使用的ConsumeRequest并非同一个类,且顺序消息每一个ConsumeRequest消费任务不是以消费消息条数来计算,而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,默认60s后,本次消费任务结束,由消费组内其他线程继续消费;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void submitConsumeRequest(final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) { //一次只拉取32条数据,不足32直接处理
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest); // 若线程池拒绝了,则延迟5s,再执行consumeExecutor.submit
}
} else {//超过32条,就进行分页处理。
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); //消费请求处理线程
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest); // 若线程池拒绝了,则延迟5s,再执行consumeExecutor.submit
}
}
}
}
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public void run() {
if (this.processQueue.isDropped()) { // 若队列失效,则不在消费消息
return;
}
// 获取用户自定义的消息处理监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
// 优先执行钩子方法
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) { // 设置消费开始时间,记录到MessageExt的properties属性中
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); // 交由Listener实际处理消息
} catch (Throwable e) {
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) { // 消费时抛出异常
returnType = ConsumeReturnType.EXCEPTION;
} else { // 消费时返回null
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT; // 消费超时
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED; // 失败消费,稍后尝试消费
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS; // 消费成功
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) { // listener返回null或者抛异常,都会返回ReConsume_Later状态
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 处理消息处理完后,执行钩子方法。
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) { // 若队列没有被取消订阅
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}
}
public void processConsumeResult(final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) { // 做一些统计,若
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) { // ackIndex默认值为Integer.MAX_VALUE
ackIndex = consumeRequest.getMsgs().size() - 1; // 故这里会被执行
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { // 若消费成功则不会被执行
MessageExt msg = consumeRequest.getMsgs().get(i);
}
break;
case CLUSTERING: // 若消费成功则不会被执行
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context); // 将消费失败的消息发回对应Broker
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) { // 若发回Broker失败
consumeRequest.getMsgs().removeAll(msgBackFailed); // 移除消费失败的消息
// 延迟5s再次消费失败的消息
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 从ProcessQueue的消息缓存中移除被消费的消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 更新当前消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
}

对于顺序消息ConsumeMessageOrderlyServicestart方法中会每20s调用一次Broker的lockBatchMQ方法,锁定订阅的消息队列,且锁的失效时间是30s,对于ConsumeRequest消费线程,不是以消费消息条数来计算,而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY默认60s后,本次消费任务结束,由消费组内其他线程继续消费;

且针对每个队列创建了一个本地锁objLock,获取到该锁锁后,在判断若是广播模式或队列是锁定状态锁定时间未超过30s,则开始循环消费该队列的消息,若Topic是集群模式且队列没有被锁住或队列锁定时间超过30s,则延迟10ms后通过tryLockLaterAndReconsume中去调用Broker再次尝试锁该队列,成功则延迟10ms,失败延迟3s,后再调用submitConsumeRequest方法,且结束当前线程,若当前线程处理时间超过60s,则执行同样的延迟逻辑,退出线程;

然后通过ProcessQueue的takeMessages方法,从msgTreeMap中获取消息后将消息都放在一个临时的consumingMsgOrderlyTreeMap中,然后进行顺序消费,消费成功最终调用commit方法将其清空,注意使用的消息缓存是TreeMap,且通过offset作为排序的key,顺序消息取消息是使用的是pollFirstEntry方法,故最终取出的消息是保持顺序的,然后是调用用户自定义的MessageListenerOrderly进行消息的消费处理;

消费成功,需要调用ProcessQueuecommit方法将临时的consumingMsgOrderlyTreeMap清除,若失败尝试请求broker将消息放回broker,若发送成功则相当于消费完成,执行和消费完成相同的逻辑;若发送失败,将消息从临时consumingMsgOrderlyTreeMap中移除,且添加到msgTreeMap中等待下一次消费,然后延迟一定时间后,再执行submitConsumeRequest方法进行消费;若消费成功同样保持消费进度offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
public void start() { // 每隔20秒会处理一下提交给他的方法
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() { // 每20s执行一次
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 调用broker尝试锁队列,成功返回true,失败返回false
boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
if (lockOK) { // 若成功再延迟10ms将再次调用submitConsumeRequest方法
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
} else { // 若失败,则再延迟3s,再调用submitConsumeRequest方法
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
}
}
}, delayMills, TimeUnit.MILLISECONDS);
}
public boolean processConsumeResult(final List<MessageExt> msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) { // 自动提交
switch (status) {
case COMMIT:
case ROLLBACK:
case SUCCESS:
// 消费完消息之后,需要调用commit()方法将这个临时的consumingMsgOrderlyTreeMap清除
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) { // 尝试请求broker将消息放回broker,若成功失败返回true
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); // 将消息从临时consumingMsgOrderlyTreeMap中移除,且添加到msgTreeMap中
// 延迟一定时间后,再执行submitConsumeRequest方法进行消费
this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
// 相当于消费完成,需要调用commit()方法将这个临时的consumingMsgOrderlyTreeMap清除
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
// 消费完消息之后,需要调用commit()方法将这个临时的consumingMsgOrderlyTreeMap清除
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
// 将消息从临时consumingMsgOrderlyTreeMap中移除,且添加到msgTreeMap中,等待下一次消费
consumeRequest.getProcessQueue().rollback();
// 延迟一定时间后,再执行submitConsumeRequest方法进行消费
this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) { // 尝试请求broker将消息放回broker,若成功失败返回true
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); // 将消息从临时consumingMsgOrderlyTreeMap中移除,且添加到msgTreeMap中
// // 延迟一定时间后,再执行submitConsumeRequest方法进行消费
this.submitConsumeRequestLater(consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 若消费成功,更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
}
public abstract class RebalanceImpl {
public void lockAll() {
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty()) continue;
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try { // 调用broker,批量锁定该队列
Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
}
}
}
} catch (Exception e) {}
}
}
}
}
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
public void submitConsumeRequest(final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) {
if (dispathToConsume) { // 若顺序消费模式可以消费
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
class ConsumeRequest implements Runnable {
public void run() { // 每一个ConsumeRequest消费任务不是以消费消息条数来计算,而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,默认60s后,本次消费任务结束,由消费组内其他线程继续消费
if (this.processQueue.isDropped()) { // 若已经取消该队列的订阅
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) { // 通过加锁,将并发的消息顺序进行消费。消息处理的方式没什么特别。
// 广播模式直接进入消费,无需锁定处理对列因为相互直接无竞争,集群模式proceessQueue被锁定并且锁未超时,默认30失效,locked锁是在ConsumeMessageOrderlyService的start方法中完成的
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {// 若已经取消该队列的订阅
break;
}
// 集群模式且队列没有被锁住,则延迟10ms后通过tryLockLaterAndReconsume中去调用Broker再次尝试锁该队列,成功则延迟10ms,失败延迟3s,后再调用submitConsumeRequest方法
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) {
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 集群模式且队列锁住失效,则延迟10ms后通过tryLockLaterAndReconsume中去调用Broker再次尝试锁该队列,成功则延迟10ms,失败延迟3s,后再调用submitConsumeRequest方法
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) {
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
// 耗时大于60s,则延迟10ms后通过tryLockLaterAndReconsume中去调用Broker再次尝试锁该队列,成功则延迟10ms,失败延迟3s,后再调用submitConsumeRequest方法
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setProps(new HashMap<String, String>()); // init the consume context type
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); // 调用消费者具体的消费方法
} catch (Throwable e) {
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 若消费成功,需要调用ProcessQueue的commit方法将临时的consumingMsgOrderlyTreeMap清除,若失败尝试请求broker将消息放回broker,若发送成功则相当于消费完成,执行和消费完成相同的逻辑
// 若发送失败,将消息从临时consumingMsgOrderlyTreeMap中移除,且添加到msgTreeMap中等待下一次消费,然后延迟一定时间后,再执行submitConsumeRequest方法进行消费
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else { // 若锁定队列失败
if (this.processQueue.isDropped()) {
return;
}
// 延迟100ms后,调用broker尝试锁队列,成功再延迟10ms将再次调用submitConsumeRequest方法,若失败,则再延迟3s,再调用submitConsumeRequest方法
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
}
public class ProcessQueue {
public List<MessageExt> takeMessages(final int batchSize) { // 从msgTreeMap中获取消息后:其内部会将消息都放在一个临时的TreeMap中,然后进行顺序消费
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
// 从msgTreeMap中获取batchSize条数据,每次都返回offset最小的那条消息并从msgTreeMap中移除
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue()); // 把消息放到返回列表
// 把消息的offset和消息体msg,放到顺序消费TreeMap中
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false; // 没有取到消息,说明不需要消费,即将consuming置为FALSE。
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
}

消息的拉取最终调用的是PullAPIWrapperpullKernelImpl方法,拉取模式固定为ASYNC,最终调用MQClientAPIImplpullMessageAsync方法想Broker发送RequestCode.PULL_MESSAGE命令拉取消息,在operationComplete方法中完成PullCallback回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class PullAPIWrapper {
public PullResult pullKernelImpl(final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset,
final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 找到Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{// check version 版本检查
if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
//构建请求
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset); // 顺便将本地消费进度offset提交给Broker
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); // 默认为15s
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
//拉取消息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
public class MQClientAPIImpl {
public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
//几种拉取方式
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
private void pullMessageAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { //异步拉取
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand(); //处理拉取消息的结果
if (response != null) { //有响应
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {//没响应
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
}

在Broker通过PullMessageProcessor方法的processRequest方法处理RequestCode.PULL_MESSAGE请求。首先端构建消息过滤器,然后在DefaultMessageStoregetMessage查询消息中调用MessageFilterisMatchedByConsumeQueue方法。若ResponseCode.PULL_NOT_FOUND未拉取到数据,则再创建一个拉取请求且通过PullRequestHoldServicesuspendPullRequest将该请求放入ManyPullRequest请求拉取队列,这里看似返回了null实际上并没有响应给Consumer,而是在AsyncNettyRequestProcessor中的RemotingResponseCallback中做了处理,从而实现长连接。若是集群模式拉取请求中也会上报消费offset的进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
}
}
} else {
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
}
}
//在Broker端构建消息过滤器
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
}
// 获取消息
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
//消息拉取结果
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
break;
default:
assert false;
break;
}
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
break;
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {// 消息长轮询
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 没有拉取到消息,就再创建一个拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 将请求放入ManyRequestPull请求队列,为了配合长连接处理
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
}
}
boolean storeOffsetEnable = brokerAllowSuspend; // 默认为true
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; // 且为集群模式
// 且不是从节点
storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) { // 集群模式且当前节点不是从节点,保存当前客户端对该Group该Topic的对应QueueID的消费进度offset
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}

长轮询

对于消息的发送基本能达到实时的效果,是通过PullRequestHoldService类中长轮训来实现的,该类是一个线程类,在Broker中的BrokerController中被实例化和启动。客户端拉取数时未拉取到数据就会将请求通过suspendPullRequest方法放入pullRequestTable中。在run方法中一直循环若没有消息就waitForRunning方法等待5s,若有数据则会被提前唤醒。然后通过checkHoldRequest方法检查请求对象,若有数据则将数据返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class PullRequestHoldService extends ServiceThread {
private ConcurrentMap<String, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024);
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
public void run() { // 处理ManyPullRequest线程
while (!this.isStopped()) {
try {// 如果开启了长轮询,等待5秒后再去查
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {//没有开启长轮询,等待1秒后再去查。
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest(); //检查请求对象
long costTime = this.systemClock.now() - beginLockTimestamp;
} catch (Throwable e) {
}
}
}
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 从CommitLog中检查是否有新的消息。
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {//通知消息到达
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId); //CommitLog消息到达通知
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) { //判断是否有新的消息
//检查新的消息是否是ConsumeQueue感兴趣的消息
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) { //如果是感兴趣的消息,就等待线程唤醒后执行消息推送。
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
} catch (Throwable e) {
}
continue;
}
}
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try { //请求超时后也给客户端响应。
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
} catch (Throwable e) {
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
}
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try { // 再次调用PullMessageProcessor的processRequest方法处理,注意这里的brokerAllowSuspend传入的false
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
}
}
});
} catch (Throwable e) {
}
}
} catch (RemotingCommandException e1) {
}
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); // 异步执行处理任务
}
}

还有一种方式在DefaultMessageStoreReputMessageService线程类的run方法中执行分发请求时,执行完分发请求后通过调用NotifyMessageArrivingListenerarriving方法从而调用PullRequestHoldServicenotifyMessageArriving方法进行一起请求线程的检查从而通知到客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ReputMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
try { // 每隔1毫秒,往ConsumeQueue和IndexFile中转发一次CommitLog写入的消息
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
}
}
}
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); // 获取CommitLog中的消息
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//从CommitLog中获取一个DispatchRequest,拿到一份需要进行转发的消息,也就是从commitlog中读取的。
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest); //分发CommitLog写入消息
// 长轮询: 如果有消息到了主节点,并且开启了长轮询。
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { // 从节点
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
}

集群模式下消费者策略

Consumer也是以MessageQueue为单位来进行负载均衡,分为集群模式广播模式。广播模式下每条消息都会投递给订阅了Topic的所有消费者实例,在Consumer分配Queue时,所有Consumer都分到所有的Queue。集群消费模式每条消息只需要投递到订阅该TopicConsumer Group下的一个实例,RocketMQ采用主动拉取方式拉取并消费消息,在拉取时需明确指定拉取哪一条MessageQueue

每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时会按照Queue的数量实例的数量平均分配Queue给每个实例。每次分配时都会将MessageQueue消费者ID进行排序,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类可在Consumer中直接指定默认情况下使用的是最简单的平均分配策略

  • AllocateMachineRoomNearby将同机房的Consumer和Broker优先分配在一起。该策略可通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般用平均分配策略轮询分配策略
  • AllocateMessageQueueAveragely平均分配,将所有MessageQueue平均分给每一个消费者
  • AllocateMessageQueueAveragelyByCircle轮询分配,轮流的给一个消费者分配一个MessageQueue。
  • AllocateMessageQueueByConfig: 直接指定一个messageQueue列表,类似于广播模式,直接指定所有队列。
  • AllocateMessageQueueByMachineRoom按逻辑机房的概念进行分配。对BrokerName和ConsumerIdc有定制化的配置。
  • AllocateMessageQueueConsistentHash一致性哈希策略只需要指定一个虚拟节点数,用一个哈希环算法,虚拟节点是为了让Hash数据在环上分布更为均匀。

对于消费者策略可以通过DefaultMQPushConsumer构造方法设置,默认是使用AllocateMessageQueueAveragely平均分配策略。且该负载均衡策略在RebalanceImplrebalanceByTopic方法中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}