RocketMQ生产者源码

普通消息

同步消息是通过当前线程直接调用DefaultMQProducerImplsendDefaultImpl方法,异步消息会将任务提交给异步线程池执行,最终同样是调用DefaultMQProducerImplsendDefaultImpl方法。发送消息前首先从本地缓存查找找,若本地缓存没有,则去NameServer上申请Topic

然后通过MQFaultStrategyselectOneMessageQueue获取具体的具体要将消息发送到哪一个队列中,Producer选择MessageQueue方法是消息数自增对队列数取模,可通过sendLatencyFaultEnable参数开启Broker故障延迟机制,发送消息失败后一定时间内不在往同一个Queue重复发送的机制,在LatencyFaultToleranceImpl中维护了曾经发送失败的Broker列表到faultItemTable中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
public class DefaultMQProducer extends ClientConfig implements MQProducer {
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 检查Topic长度和格式,以及不能包含在不允许发布消息的列表中,消息体不能为空且不能超过4M
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg); // 同步发送消息
}
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback); // 异步发送消息
}
}
public class DefaultMQProducerImpl implements MQProducerInner {
// 同步消息
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
// 异步消息
public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() { // 将消费发送任务提交给线程池
@Override public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) { // timeout默认为3s
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e); // 调用回调方法
}
} else { // 调用回调方法
sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); // 确定当前Client是正常RUNNING的,否则抛出异常
Validators.checkMessage(msg, this.defaultMQProducer); // 检查Topic(0, 127]和消息体(0, 4M]
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 生产者获取Topic的公开信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) { // 重试次数,异步默认重试2次共3次,同步不重试共1次
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// Producer计算把消息发到哪个MessageQueue中,自增然后取模
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName(); // 根据MessageQueue去获取目标节点的信息。
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) { // 重新发送期间使用命名空间重置主题
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) { // 判断是否超时,默认3s
callTimeout = true;
break; // 若超时
}
// 实际发送消息的方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null; // 异步发送返回null
case ONEWAY:
return null; // 单向发送返回null
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue; // 若重试则继续,否则直接返回结果
}
}
return sendResult;
default:
break;
}
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
}
public class MQFaultStrategy {
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// sendLatencyFaultEnable默认关闭,Broker故障延迟机制,表示一种发送消息失败后一定时间内不在往同一个Queue重复发送的机制
if (this.sendLatencyFaultEnable) {
try {
// Producer选择MessageQueue的方法是自增然后取模。
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// Broker轮询,尽量将请求平均分配给不同的Broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();// 自增取模计算
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
}
return tpInfo.selectOneMessageQueue(); // 自增取模计算
}
return tpInfo.selectOneMessageQueue(lastBrokerName); // 自增取模计算
}
}
public class TopicPublishInfo { //选择MessageQueue的方式:递增取模
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
}

首先从本地缓存找,若本地缓存没有则调用MQClientAPIImplgetTopicRouteInfoFromNameServer只传入Topic的方法,然后调用updateTopicRouteInfoFromNameServer方法,注意这里传入的isDefaultfalse,且DefaultMQProducernull,此时会通过当前真实的Topic调用MQClientAPIImplgetTopicRouteInfoFromNameServer方法,去NameServerTopic路由表topicQueueTable中查询该Topic对应的队列信息和Broker列表,若该Topic不存在肯定是查不到的;

tryToFindTopicPublishInfo会再次判断haveTopicRouterInfo是否为false,或Producer的MessageQueue列表是否为空,第一次肯定都为false,然后会再次调用updateTopicRouteInfoFromNameServer方法,与第一次调用不同的是,这次调用传入的isDefaulttrue,且DefaultMQProducer不为null,此时会调用MQClientAPIImplgetDefaultTopicRouteInfoFromNameServer方法,注意此时传入的并非是真实的Topic,而是从DefaultMQProducergetCreateTopicKey方法获取的TBW102TBW102每个Broker启动时就创建了,且在向NameServer注册和发送心跳时都会同步到NameServer的Topic路由表topicQueueTable中,故该方法肯定是能正常返回包含queueDatasbrokerDatasTopicRouteData,且会重置队列数,默认为4

然后对获取到的最新的Topic对应的路由信息与本地的对比,通过topicRouteDataIsChange判断是否发生改变,若发生改变则更新brokerAddrTable中缓存的信息,然后在通过topicRouteData2TopicPublishInfo方法遍历从NameServer中获取的TopicRouteData的队列列表queueDatas,通过真正的Topic和远程获取队列中的Broker地址创建MessageQueue,并保存到TopicPublishInfo中,然后将TopicPublishInfo更新到MQProducerInnertopicPublishInfoTable缓存中供后续使用,特别注意的是这里并没有将新建的Topic注册到Broker,真正注册到Topic是在发送消息到Broker上时在SendMessageProcessorasyncSendMessage方法中通过preSend方法中msgCheck方法检查,若不存在则创建,并注册到NameServer

MQClientAPIImplgetDefaultTopicRouteInfoFromNameServer方法和getTopicRouteInfoFromNameServer最终都是调用getTopicRouteInfoFromNameServer方法,最终通过RequestCode.GET_ROUTEINFO_BY_TOPIC关联调用NameServerDefaultRequestProcessorgetRouteInfoByTopic方法获取Topic路由信息;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
public class DefaultMQProducerImpl implements MQProducerInner {
// 由表的过程都是先从本地缓存找,本地缓存没有,就去NameServer上申请。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// Producer向NameServer获取更新Topic的路由信息。
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
// 还是从本地缓存中寻找Topic路由信息。
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// haveTopicRouterInfo默认为false,Producer判断MessageQueue列表是否为空,第一次肯定都为false
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 注意这里传入的isDefault为true,且传入DefaultMQProducer,则此时到NameServer查询的不是当前传入的Topic而是createTopicKey对应的Topic即TBW102
// TBW102在Broker启动时就创建了,且在向NameServer注册和发送心跳时都会同步到NameServer的Topic路由表中,这里会做一个转换并存储到topicPublishInfoTable
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
}
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) { // 若第一次启动生产者,则该Topic在NameServer上并没有被注册到,Topic路由表中
// 这里getCreateTopicKey方法返回的Topic为TBW102
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else { // 若单纯查Topic则defaultMQProducer为null,isDefault为false
// 请求NameServer的GET_ROUTEINFO_BY_TOPIC,获取topicRouteData
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData); // 对Broker和Queue排序有判断是否有变更
if (!changed) { // 若未发生变更,再遍历判断一下生产者发布的消息是否需要更新,消费者订阅的消息是否
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
if (changed) { // 若发生了变更
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) { // 更新broker信息
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
{ // 更新生产者发布的Topic信息
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) { // 更新Topic路由信息到topicPublishInfoTable缓存表中
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
{// 更新消费者订阅的Topic信息
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData); // 将获取到的最新的路由信息缓存起来
return true;
}
}
} catch (MQClientException e) {} catch (RemotingException e) {
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
}
} catch (InterruptedException e) {}
return false;
}
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else { // 非顺序消息
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
// 遍历从NameServer获取到的Broker列表,判断当前队列的Broker是否有效
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue; // 若当前队列Broker无效,这直接跳过
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue; // 若当前队列Broker不是主节点,这直接跳过
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
}

首先通过BrokerName到找brokerAddrTable缓存中,获取该Broker的Master节点地址,若brokerAddr地址不存在则再次调用tryToFindTopicPublishInfo方法从NameServer上获取最新Broker列表;若不是批量消息生成UNIQ_KEY放到msg属性中,若消息体大于4K将默认压缩,不论是同步还是异步或是单向消息最终都会调用MQClientAPIImplsendMessage方法,不同的是同步方法没有SendCallback回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 通过BrokerName到找brokerAddrTable缓存中,获取该Broker的Master节点地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {// 通过Broker名称获取Broker地址,若获取不到则去NameServer上获取。
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
if (!(msg instanceof MessageBatch)) { //for MessageBatch,ID has been set in the generating process
MessageClientIDSetter.setUniqID(msg); // 若不是批量消息,则生成UNIQ_KEY放到msg属性中
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) { // 消息体大于4K将默认压缩
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 重试消息
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 真正向Broker发送消息,异步要传入回调函数
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 真正向Broker发送消息,同步不需要传入回调函数
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {//消息发送完成后执行钩子程序。
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
public class MQClientAPIImpl { // 不论同步还是异步最终都会调用该方法
public SendResult sendMessage(final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) { // sendSmartMsg默认为true
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}

}

对于同步消息最终用通过RequestCode.SEND_MESSAGE编号最终在Broker中执行SendMessageProcessorprocessRequest方法,异步消息最终调用的是SendMessageProcessorasyncProcessRequest方法,最终都是调用的asyncProcessRequest方法,不同点在于同步消息的processRequest中获取到异步CompletableFuture直接调用get方法等待结果,不论是同步还是异步方法最终都是在handlePutMessageResultFuture方法中调用CompletableFuturethenApply方法,在响应后最终在handlePutMessageResult中调用doResponse方法将结果写回给客户端。

SendMessageProcessorpreSend方法很关键,若Topic不存在,且开启了自动创建机制,且该Topic是通过BW102模板创建的Topic,会在Broker上创建该Topic,并注册到NameServer;在preSend中调用msgCheck,从而调用TopicConfigManagercreateTopicInSendMessageMethod来完成该功能。

注意这里只是在Broker上创建了Topic,并没有创建-对应的ConsumeQueueConsumeQueue的创建是在ReputMessageService中将CommitLog的消息分发到ConsumeQueue时,通过调用DefaultMessageStorefindConsumeQueue方法创建的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
public class MQClientAPIImpl {
private SendResult sendMessageSync(final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
}
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
// channel是和Nameserver之间建立的一个连接。
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { // 网络连接ok则发送请求
try {
doBeforeRpcHooks(addr, request); //计算时间
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 真正发网络请求的地方
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
}
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
}
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
}
return response;
}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) {
// 这里是创建Topic的关键,通过msgCheck方法检查,若不存在则创建,并注册到NameServer
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response); // 重试消息会被放入死信队列,异步处理逻辑
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); // 事务消息持久化
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); // 普通消息持久化
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response); // 这里是创建Topic的关键
if (response.getCode() != -1) {
return response;
}
return response;
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); // 获取最大重试次数,默认16次
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName); // 生成死信队列Topic:%DLQ%+groupName
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; // 全局的死信队列只有一个,且队列也只有一个
// 创建死信队列对应的Topic
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true; // 消息会被异步处理
}
}
public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response; // 没有写权限,且该Topic是循序消息
}
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response; // Topic格式不合法
}
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response; // Topic属于NOT_ALLOWED_SEND_TOPIC_SET列表中,如SCHEDULE_TOPIC_XXXX
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) { // 若存Broker缓存中未获取到该Topic
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
// 通过该方法真正在Broker上创建Topic
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(), requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 重试消息
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
}
public class TopicConfigManager extends ConfigManager {
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) return topicConfig;
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); // 获取TBW102
if (defaultTopicConfig != null) { // 若TBW102不为空
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { // 默认为自动创建
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
// TBW102的Perm为:PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE
if (PermName.isInherited(defaultTopicConfig.getPerm())) { // 默认为true
topicConfig = new TopicConfig(topic);
int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
}
}

if (topicConfig != null) { // 将新的Topic信息放入缓存中
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
this.persist();
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {}
if (createNew) { // 将新增的Topic信息注册到NameServer
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
}

对于异步方法当获取到响应后会回调InvokeCallback中的operationComplete方法,在该方法中若成功则回调用SendCallbackonSuccess方法。若失败则走重试逻辑若中还是失败则回调用SendCallbackonException方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class MQClientAPIImpl {
private void sendMessageAsync(final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance,
final int retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult); // 回调用SendCallback的onSuccess方法
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}
}

事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); // 回查次数
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10000"); // 回查时间
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
}

对于事务消息的发送是通过TransactionMQProducer类的sendMessageInTransaction方法来完成的,若事务消息设置了延迟参数则将会被清除,会设置TRAN_MSG属性为true,然后调用send方法发送消费到Broker,send方法中和普通的发送消息调用的一个方法,但普通消息是走的DefaultMessageStoreasyncPutMessage方法,事务消息是走的TransactionalMessageServiceImplasyncPrepareMessage方法。若发送half成功则调用设置的TransactionListenerexecuteLocalTransaction方法若返回LocalTransactionState.COMMIT_MESSAGELocalTransactionState.ROLLBACK_MESSAGE,则调用MQClientAPIImplendTransactionOneway方法,从而调用Broker执行事务的提交或回滚逻辑,若executeLocalTransaction方法异常或返回LocalTransactionState.UNKNOW则Broker不会做任务事情,则只能通过回查逻辑执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class TransactionMQProducer extends DefaultMQProducer {
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}
public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
if (msg.getDelayTimeLevel() != 0) { // 不支持延迟消息
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; // 默认本地事务状态为UNKNOW
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) { // 默认localTransactionExecuter为null
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) { // transactionListener是调用时设置的
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
} catch (Throwable e) {
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
public void endTransaction(final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}
}
public class MQClientAPIImpl {
public void endTransactionOneway(final String addr, final EndTransactionRequestHeader requestHeader, final String remark, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
}

首先会将将真正的事务Topic存储到REAL_TOPIC属性中,以及将真正的queueId存储到REAL_QID属性中,然后将Topic换成RMQ_SYS_TRANS_HALF_TOPIC,然后将替换了Topic的事务消息通过DefaultMessageStoreasyncPutMessage方法最终存储到CommitLog中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); // 将真正的事务Topic存储到REAL_TOPIC属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); // Topic换成RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}

最终通过RequestCode.END_TRANSACTION编码关联调用EndTransactionProcessorprocessRequest方法,不论是是COMMIT还是ROLLBACK都将消息从RMQ_SYS_TRANS_HALF_TOPIC队列中查询出,若是COMMIT则将从RMQ_SYS_TRANS_HALF_TOPIC队列中查询的消息真正存储到其真实的Topic队列中,然后成功则再在RMQ_SYS_TRANS_OP_HALF_TOPIC队列中添加一条对应的消息标识该Half消息被删除。若为ROLLBACK则直接在RMQ_SYS_TRANS_OP_HALF_TOPIC队列中添加一条对应的消标识事务结束。开源版本进行了阉割,不会走回查逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
return response; // 若当前节点是从节点
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 将事务消息TRAN_MSG标记移除
RemotingCommand sendResult = sendFinalMessage(msgInner);// 将从RMQ_SYS_TRANS_HALF_TOPIC队列中查询的消息真正存储到其真实的Topic队列中
if (sendResult.getCode() == ResponseCode.SUCCESS) {// 存储成功则在RMQ_SYS_TRANS_OP_HALF_TOPIC队列中添加对应的消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {// 成功则在RMQ_SYS_TRANS_OP_HALF_TOPIC队列中添加对应的消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
}
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
private OperationResult getHalfMessageByOffset(long commitLogOffset) {
OperationResult response = new OperationResult();
// 这是使用了桥接模式,最终会调用commitLog的getMessage方
MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
if (messageExt != null) {
response.setPrepareMessage(messageExt);
response.setResponseCode(ResponseCode.SUCCESS);
} else {
response.setResponseCode(ResponseCode.SYSTEM_ERROR);
response.setResponseRemark("Find prepared transaction message failed");
}
return response;
}
}

BrokerControllerinitialTransaction()方法中会初始化事务消息检查类TransactionalMessageCheckService,该类是一个线程类,在BrokerControllerstart方法中通过startProcessorByHa方法开启事务消息检查线程。若已经超过最大回查次数,则将消息添加到TRANS_CHECK_MAXTIME_TOPIC队列中,若需要检查则将消息写回RMQ_SYS_TRANS_HALF_TOPIC队列中防止再次失败,然后调用AbstractTransactionalMessageCheckListenerresolveHalfMsg方法检查消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public class TransactionalMessageCheckService extends ServiceThread {
public void run() { // 默认60s
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
}
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // 获取超时时间6s
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // 获取最大回查次数15
long begin = System.currentTimeMillis();
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
}
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; // RMQ_SYS_TRANS_HALF_TOPIC
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
return;
}
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue); // 获取RMQ_SYS_TRANS_OP_HALF_TOPIC事务完成队列
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
if (halfOffset < 0 || opOffset < 0) {
continue;
}
List<Long> doneOpOffset = new ArrayList<>(); // 已经过时的消息halfOffset, 即halfOffset已经被处理了
HashMap<Long, Long> removeMap = new HashMap<>(); // 获取已经完成的的映射已经存在在RMQ_SYS_TRANS_OP_HALF_TOPIC中的,即待处理的
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
break;
}
if (removeMap.containsKey(i)) { // 若halfOffset已经包含在,待处理中了,说明已经处理完成了
Long removedOpOffset = removeMap.remove(i); // 从待处理列表中移除
doneOpOffset.add(removedOpOffset); // 添加到已经处理完成的列表中
} else {
GetResult getResult = getHalfMsg(messageQueue, i); // 消费RMQ_SYS_TRANS_HALF_TOPIC队列中的事务消息
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) { // 若消息为空
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
break; // 没有新的消息,说明没有需要回查的消息则退出while循环
} else {
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 若消息已经超过最大回查次数或消息生产时间超过最大储存时间默认72小时
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt); // 将消息添加到TRANS_CHECK_MAXTIME_TOPIC队列中
newOffset = i + 1;
i++;
continue; // 跳过该消息
}
if (msgExt.getStoreTimestamp() >= startTime) {
break; // 消息生成时间比当前回查时间大,说明没有需要回查的消息则退出while循环
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // 消息产生多久了
long checkImmunityTime = transactionTimeout; // 默认6s
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) { // 若没有到需要回查的时间
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { // 写回RMQ_SYS_TRANS_HALF_TOPIC队列中
newOffset = i + 1;
i++;
continue; // 写回RMQ_SYS_TRANS_HALF_TOPIC队列中,继续下一条
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) { // 写回RMQ_SYS_TRANS_HALF_TOPIC队列中,因为若回查失败,还可以再次回查
continue; // 若会写失败,继续重试
}
listener.resolveHalfMsg(msgExt); // 回调客户端checkLocalTransaction方法
} else { // 如果无法判断是否发送回查消息,则加载更多的已处理消息进行刷选
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {// 保存half消息队列的回查进度
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {// 保存处理队列op的进度
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {}
}
}

检查消息是异步执行的,首先还原当前消息真正的Topic,然后通过Broker2ClientcheckProducerTransactionState方法中通过RequestCode.CHECK_TRANSACTION_STATE关联调用ClientRemotingProcessorClientRemotingProcessor方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AbstractTransactionalMessageCheckListener {
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
}
}
});
}
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); // 真正的Topic
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
}
}
}
public class Broker2Client {
public void checkProducerTransactionState(final String group, final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final MessageExt messageExt) throws Exception {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
}
}
}

首先调用自定义TransactionListenercheckLocalTransaction方法,然后调用MQClientAPIImplendTransactionOneway方法。最终又调用EndTransactionProcessorprocessRequest方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
}
}
}
return null;
}
}
}
public class DefaultMQProducerImpl implements MQProducerInner {
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
localTransactionState = transactionListener.checkLocalTransaction(message);
}
} catch (Throwable e) {
exception = e;
}
this.processTransactionState(localTransactionState, group, exception);
}
}
private void processTransactionState(final LocalTransactionState localTransactionState, final String producerGroup, final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
} catch (Exception e) {
}
}
};
this.checkExecutor.submit(request);
}
}
public void endTransactionOneway(final String addr, final EndTransactionRequestHeader requestHeader, final String remark, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}

延迟消息

1
2
3
4
5
6
7
8
9
10
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 分组名称
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延时队列
SendResult sendResult = producer.send(msg);
}
producer.shutdown();

延迟消息的等级以及消息的Tag都是设置到Messageproperties属性中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Message implements Serializable {
private String topic;
private int flag; // 一般默认为true
private Map<String, String> properties;
private byte[] body;
private String transactionId;

public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
public void setTags(String tags) {
this.putProperty(MessageConst.PROPERTY_TAGS, tags);
}
}

在调用CommitLogasyncPutMessage方法存储消息时,会判断若是延迟消息会将消息的Topic替换为SCHEDULE_TOPIC_XXXX,且将消息的真实Topic存到REAL_TOPIC属性中,将真实的队列ID存储到REAL_QID属性中,最终存储消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) { // 延迟消息的处理
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 将Topic修改为SCHEDULE_TOPIC_XXXX
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 根据延迟等级获取队列ID
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); // 将真正的Topic存储到REAL_TOPIC属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); // 真正的队列ID
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) { // 若文件不存在会已经满了,则创建一个新的文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
}

延迟消息写入时会将延迟消息转为写入到SCHEDULE_TOPIC_XXXX的Topic中,系统内置的该Topic有18个队列,对应18个延迟级别,且每个级别会创建一个ConsumeQueue存储该等级的数据。ScheduleMessageService启动时会根据18个等级创建18个DeliverDelayedMessageTimerTask队列检查任务,给任务会每隔1s执行一次DeliverDelayedMessageTimerTask.executeOnTimeup任务,若执行失败会变成10s执行一次;

首先根据延迟等级定位到对应的ConsumeQueue,然后通过offsetConsumeQueue中读取消息,这里的消息仅仅是子在CommitLog的Offset等信息,并非真正的消息,然后通过步长的方式遍历从ConsumeQueue读出的多个消息,因为每个存储在ConsumeQueue的消息大小都为20字节,然后计算消息应该被消费的时间,并计算出下一个读取ConsumeQueue的offset,若消息到期则从CommitLog中获取真正的消息,并将延迟消息写入正常消息队列,若消息投递失败,则延迟10s再次唤醒任务,若消息没到期则延迟到到期时间后再次唤醒任务,若投递成功则继续处理下一条

值得注意的是tagsCode字段,存储的是消息到期时间消息生成时间+延迟等级对应的时间,该值是在DefaultMessageStoreReputMessageServicedoReput方法commitLog的消息分发到ConsumeQueue,通过调用CommitLog的checkMessageAndReturnSize方法转换的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class ScheduleMessageService extends ConfigManager {
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
public void start() { // 延迟消息服务的启动方法
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {//定时执行延迟消息处理任务
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() { //每隔10秒,将延迟消息持久化到硬盘中。
@Override public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
}
class DeliverDelayedMessageTimerTask extends TimerTask {
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
//拿到延迟级别对应的队列
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset; // offset第一次默认为0,后面每次消费了延迟消息后会不停累加
if (cq != null) { // 若取到了ConsumeQueue其不为空,则尝试判断消息是否到期
// 这里是从ConsumeQueue中读取消息,这里的消息仅仅是子在CommitLog的Offset等信息,并非真正的消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 遍历从ConsumeQueue读出的多个消息,因为每个存储在ConsumeQueue的消息大小都为20字节
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong(); // 8字节CommitLog的Offset消息偏移量
int sizePy = bufferCQ.getByteBuffer().getInt(); // 4字节消息长度
// 8字节Tag的哈希值,但是这里存储的是消息到期时间,该值是在DefaultMessageStore.ReputMessageService.doReput方法将commitLog的消息分发到ConsumeQueue时,通过调用CommitLog的checkMessageAndReturnSize方法转换的
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
// 计算消息应该被消费的时间
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 下一个读取ConsumeQueue的位置
long countdown = deliverTimestamp - now; // 用到期时间 - now,判断是否到期
//把每个延迟消息封装成一个MessageExt
if (countdown <= 0) { // 消息到期
// 从CommitLog中获取真正的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
// 恢复消息真正的Topic和queueId
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
continue;
}
//将延迟消息写入正常消息队列,这样就能被消费者正常消费了。若消息成功则继续处理下一条
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else { // 若消息投递失败,则延迟10s再次唤醒任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} catch (Exception e) {}
}
} else { // 消息没到期,则延迟到到期时间后再次唤醒任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
}
}
} // end of if (cq != null)
// 若获取的ConsumeQueue为null,则继续等待100ms,再次唤醒任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}

}