Producer启动源码

Producer的启动是通过DefaultMQProducer来完成的

1
2
3
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();

DefaultMQProducer构造方法中完成实例化DefaultMQProducerImpl,主要是创建默认的异步发送消息的线程池,阻塞队列长度默认为50000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DefaultMQProducer extends ClientConfig implements MQProducer {
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
}
public class DefaultMQProducerImpl implements MQProducerInner {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
}

通过DefaultMQProducerstart方法调用DefaultMQProducerImpl的start方法,首先检查producerGroup生产者组是否合法,然后将当前DefaultMQProducer实例名称替换为PID,然后通过MQClientManager创建MQClientInstanceMQClientInstance才是真正的核心类。然后将DefaultMQProducer注册到MQClientInstanceproducerTable中,然后通过start方法启动MQClientInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class DefaultMQProducer extends ClientConfig implements MQProducer {
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {}
}
}
}
public class DefaultMQProducerImpl implements MQProducerInner {
private ServiceState serviceState = ServiceState.CREATE_JUST;
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查生产者组producerGroup是否为空,长度是否小于255,是否满足正则表达式^[%|a-zA-Z0-9_-]+$,且不能等于DEFAULT_PRODUCER
this.checkConfig();
// 修改当前的instanceName为当前进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取MQ实例MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册MQClientInstance实例,方便后续调用
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动实例
if (startFactory) {
mQClientFactory.start();
}
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {}
}
}, 1000 * 3, 1000);
}
}
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
public static MQClientManager getInstance() {
return instance;
}
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId(); // 构建clientId:ClientIP@InstanceName@unitName
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
}
}
return instance;
}
}

通过构造方法实例化MQClientInstance时:

  • 实例化NettyClientConfig并设置一些参数
  • 实例化用于处理请求分发的NettyRequestProcessor的具体实现类ClientRemotingProcessor,通过processRequest方法完成请求的分发
  • 然后通过构造方法实例化MQClientAPIImpl
    • 在该构造方法中首先实例化NettyRemotingClientNettyRemotingClient复用Broker用来管理BrokerNameServerNetty客户端的类,用于管理ProducerBrokerNameServerNetty客户端,且同样在构造方法中并不会直接创建连接,客户端和服务端的连接是在调用invokeSyncinvokeAsyncinvokeOneway等方法时通过NettyRemotingClientcreateChannel方法来完成的
    • 通过registerProcessorRequestCodeClientRemotingProcessor绑定
  • NameServer地址设置到NettyRemotingClient
  • 实例化用于执行创建Topic等命令的MQAdminImpl
  • 实例化拉模式消费者服务线程PullMessageService
  • 实例化客户端负载均衡的RebalanceService,默认是20s执行一次,Broker监听到Consumer数量变动,也通知Consumer进行Rebalance
  • 实例化用于统计消费者消费Topic的RTTPS等消费指标的ConsumerStatsManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class MQClientInstance {
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
// 将NameServer地址设置到NettyRemotingClient中
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
}
}
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
return null;
}
}
public class MQClientAPIImpl {
private final RemotingClient remotingClient;
private final TopAddressing topAddressing;
private final ClientRemotingProcessor clientRemotingProcessor;
private String nameSrvAddr = null;
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
}

启动MQClientInstance,首先若未设置nameSrvAddr地址,则先拉取nameSrvAddr地址,然后通过MQClientAPIImplstart方法调用RemotingClientstart方法完成Netty客户端Bootstrap的配置,然后开启一系列周期定时任务,然后启动消息拉取服务异步线程PullMessageService,以及启动客户端负载均衡异步线程RebalanceService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class MQClientInstance {
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr(); // 拉取nameSrvAddr地址
}
this.mQClientAPIImpl.start(); // 通过RemotingClient的start方法完成Netty客户端Bootstrap的配置
this.startScheduledTask(); // 开启一系列周期定时任务
this.pullMessageService.start(); // 启动消息拉取服务异步线程
this.rebalanceService.start(); // 启动客户端负载均衡异步线程
this.defaultMQProducer.getDefaultMQProducerImpl().start(false); // Start push service
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try { // 每120s重新拉取一下NameServer地址
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try { // 每30s从NameServer更新一下Topic路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker(); // 每30s清理一次离线的Broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); // 每30s向所有Broker发送心跳
} catch (Exception e) {}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try { // 持久化消费的offset
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {}
}
}, 1, 1, TimeUnit.MINUTES);
}
}
public class MQClientAPIImpl {
private final RemotingClient remotingClient;
public void start() {
this.remotingClient.start();
}
}
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
public void start() { // 注意这里并没有连接到服务端
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
}
}
pipeline.addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler());
}
});

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
}

30sNameServer更新一下Topic路由信息,是通过updateTopicRouteInfoFromNameServer方法,首先遍历所有的消费者订阅的所有的Topic和所有生产者发布的Topic,遍历所有去重后的Topic通过updateTopicRouteInfoFromNameServer方法调用MQClientAPIImplgetTopicRouteInfoFromNameServer方法从而完成请求NameServerGET_ROUTEINFO_BY_TOPIC绑定的方法获取最新的Topic路由数据topicRouteData

通过GET_ROUTEINFO_BY_TOPIC关联调用DefaultRequestProcessorgetRouteInfoByTopic方法,从而调用RouteInfoManagerpickupTopicRouteData方法,遍历保存Topic对应的队列列表的topicQueueTable缓存,获取该Topic下所有ConsumerQueue队列数据,再遍历ConsumerQueue队列信息得到队列所在的Broker名称,遍历该Topic所有队列所有的Broker列表,从保存Broker信息brokerAddrTable缓存中获取Broker地址等信息。

拉取到最新的Topic路由信息后,首先判断缓存中的路由信息是否变化,若未发生变化,则再遍历判断Topic发布缓存表topicPublishInfoTable中是否存在该Topic,若存在则队列messageQueueList是否为空,且遍历判断Topic订阅缓存表topicSubscribeInfoTable中是否存在该Topic,若不存在Topic或队列不完整,则说明发生了变更,则首先更新Broker信息缓存表brokerAddrTable,更新生产者Topic发布信息缓存表topicPublishInfoTable,更新消费者订阅的Topic信息缓存表topicSubscribeInfoTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
public class MQClientInstance {
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
{ // Consumer
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) { // 遍历所有消费者订阅的所有的Topic
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
{ // Producer
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) { // 遍历所有生产者发布的Topic
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) { // 这里defaultMQProducer为null,isDefault为false
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else { // 请求NameServer的GET_ROUTEINFO_BY_TOPIC,获取topicRouteData
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData); // 对Broker和Queue排序有判断是否有变更
if (!changed) { // 若未发生变更
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
if (changed) { // 若发生了变更
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) { // 更新broker信息
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
{// 更新生成者发布的Topic信息
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
{// 更新消费者订阅的Topic信息
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData); // 将获取到的最新的路由信息缓存起来
return true;
}
}
} catch (MQClientException e) {} catch (RemotingException e) {
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
}
} catch (InterruptedException e) {}
return false;
}
private boolean isNeedUpdateTopicRouteInfo(final String topic) {
boolean result = false;
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) { // 遍历判断Topic发布缓存表topicPublishInfoTable中是否存在该Topic,若存在则队列messageQueueList是否为空
result = impl.isPublishTopicNeedUpdate(topic);
}
}
}
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) { // 遍历判断Topic订阅缓存表topicSubscribeInfoTable中是否存在该Topic,若不存在直接返回true
result = impl.isSubscribeTopicNeedUpdate(topic);
}
}
}
return result;
}
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue; // 若不包含主节点
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
}
public class MQClientAPIImpl {
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true); // 默认3s超时
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
switch (request.getCode()) {
case RequestCode.GET_ROUTEINFO_BY_TOPIC: // 通过Topic获取路由信息
return this.getRouteInfoByTopic(ctx, request);
}
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
public class RouteInfoManager {
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // 保存Topic对应的队列列表
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) { // 遍历所有消息队列,获取Broker列表
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {}
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
}

30s清理离线的Broker,遍历Broker缓存表brokerAddrTable,且遍历同一Broker Name的主从节点,判断Broker地址是否存在与路由信息缓存topicRouteTable中,若不存在则移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class MQClientInstance {
private void cleanOfflineBroker() {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try { // 这里使用CopyOnWrite的方式更新Broker缓存表中的数据
ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerTable.hasNext()) {
Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
cloneAddrTable.putAll(oneTable);
Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
while (it.hasNext()) { // 遍历同一Broker Name的主从节点
Entry<Long, String> ee = it.next();
String addr = ee.getValue();
if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
it.remove(); // 若当前broker地址不在Topic路由缓存表中,则移除该Broker
}
}
if (cloneAddrTable.isEmpty()) {
itBrokerTable.remove();
} else {
updatedTable.put(brokerName, cloneAddrTable);
}
}
if (!updatedTable.isEmpty()) {
this.brokerAddrTable.putAll(updatedTable);
}
} finally {
this.lockNamesrv.unlock();
}
}
} catch (InterruptedException e) {}
}
private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
while (it.hasNext()) { // 遍历Topic路由信息表,遍历所有的broker,判断地址是否存在
Entry<String, TopicRouteData> entry = it.next();
TopicRouteData topicRouteData = entry.getValue();
List<BrokerData> bds = topicRouteData.getBrokerDatas();
for (BrokerData bd : bds) {
if (bd.getBrokerAddrs() != null) {
boolean exist = bd.getBrokerAddrs().containsValue(addr);
if (exist) return true;
}
}
}
return false;
}
}

30s所有Broker发送心跳,心跳信息包含当前的ClientID以及当前Client的上所有的消费者生产者列表信息,在同一个客户端同一个生产组或消费组只能存在一个生产者或消费者,若既没有消费者没有生产者则不发送心跳信息,若Broker列表为空也不发送心跳,且只向Master节点发送心跳

具体心跳的发送是通过MQClientAPIImplsendHearbeat方法,最终调用Broker上ClientManageProcessorprocessRequest进行请求的处理,对于生产者组信息的遍历很简单,只是判断若ClientChannelInfo信息有变更,则仅仅更新一下ClientChannelInfo。

对于消费则组列表的处理,首先遍历消费者列表,若GroupName对应的订阅者组不存在则自动创建且将订阅者组信息subscriptionGroupTable持久化到磁盘,且若消费者组对应的重试Topic不存在,则先构建%RETRY%+GroupName重试Topic,且将新建的重试Topic同步到NameServer的路由信息topicQueueTable中;

然后在ConsumerManagerregisterConsumer方法中调用ConsumerGroupInfoupdateChannel方法和updateSubscription方法,判断若ClientChannelInfo新建立的Channel,或心跳信息中Consumer的订阅列表存在新增删除的,则执行DefaultConsumerIdsChangeListener监听器监听的CHANGE事件,最终调用MQClientInstancerebalanceImmediately方法唤醒RebalanceService线程通知所有客户端执行rebalance重置负载均衡策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {} finally {
this.lockHeartbeat.unlock();
}
}
}
private void sendHeartbeatToAllBroker() {
// ClientID以及当前Client的上所有的消费者和生产者列表信息,组装成心跳信息
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
return; // 若当前节点既没有生产者也没有消费者,则不发送心跳
}
if (!this.brokerAddrTable.isEmpty()) { // 若Broker列表为空也不发送心跳
long times = this.sendHeartbeatTimesTotal.getAndIncrement(); // 发送心跳总次数
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) { // 只向Master节点发送心跳,跳过从节点
if (id != MixAll.MASTER_ID) continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version); // 更新Broker版本信息
} catch (Exception e) {}
}
}
}
}
}
}
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
heartbeatData.setClientID(this.clientId); // clientID
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) { // 当前Client上所有的消费者信息
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) { // 当前Client上所有的生产者信息
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
}
public class MQClientAPIImpl {
public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion());
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) { // 该消费者所在的组不为空
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName()); // 构建%RETRY%+GroupName重试Topic
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// 若ClientChannelInfo是新建立的Channel,或心跳信息中ConsumerDataSet存在新增或删除的,则通知所有客户端,让客户端执行rebalance
boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {
// 若ClientChannelInfo信息有变更,则仅仅是更新一下ClientChannelInfo
this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
public class SubscriptionGroupManager extends ConfigManager {
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) { // 若订阅者组不存在
// 判断是否自动创建订阅者组,默认自动创建,以及是否是系统内部的消费者组
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
// 若设置了自动创建订阅者组,或是系统内部的消费者组,则创建订阅者组,并放入缓存
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
this.dataVersion.nextVersion();
this.persist(); // 将数据持久化到磁盘
}
}
return subscriptionGroupConfig;
}
}
public class TopicConfigManager extends ConfigManager {
public TopicConfig createTopicInSendMessageBackMethod(final String topic, final int clientDefaultTopicQueueNums, final int perm, final int topicSysFlag) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) return topicConfig; // 已存在则直接返回
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) return topicConfig; // Double Check
topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); // 重试Topic的队列数默认为1
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); // 重试Topic的队列数默认为1
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
this.persist();
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {}
if (createNew) { // 将新建的Topic同步到NameServer的topicQueueTable中
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
}
public class ProducerManager {
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
}
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp; // 若存在旧的消费者组信息则返回旧的
}
// 是否新建立的Channel,且将ClientChannelInfo缓存到channelInfoTable
boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList); // 判断心跳信息中ConsumerDataSet,是否存在新增或删除的
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) { // 默认为true,通知所有客户端,让客户端执行rebalance
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册ConsumerFilter
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
public class ConsumerGroupInfo {
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
updated = true;
}
infoOld = infoNew;
} else {
if (!infoOld.getClientId().equals(infoNew.getClientId())) { // 若ClientID不相等则更新Channel
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp); // 设置心跳时间
return updated;
}
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) { // 处理新增的Topic订阅的消费者组
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) { // 若未注册过
updated = true;
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) { // 处理删除的Topic订阅的消费者组
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
}
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:
if (args == null || args.length < 1) {
return;
}
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) { // 通知所有客户端,让客户端执行rebalance
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
case UNREGISTER:
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER: // 注册ConsumerFilter
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
public class Broker2Client {
public void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup) {
if (null == consumerGroup) {
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {}
}
}
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader = (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) { }
return null;
}
}
public class MQClientInstance {
public void rebalanceImmediately() {
this.rebalanceService.wakeup(); // 触发客户端的doRebalance方法
}
}

通过周期任务默认每5s消费的offset进行一次持久化,遍历当前实例的消费者组,消息推送模式和拉取模式实现是一样的,都是获取当前的MessageQueue列表,最终调用OffsetStorepersistAll方法持久化所有消息队列的offset

消息持久化管理有LocalFileOffsetStorageRemoteBrokerOffsetStore两种实现,分别对应消费模型的广播模式集群模式

  • 广播模式下,一条消息会被ConsumerGroup中的每一台机器消费,故每个ConsumerGroup消费进度都不一样,故需要由Consumer自身来管理Offset
  • 集群模式下,一条消息只会被ConsumerGroup中的一台机器消费,同一ConsumerGroup下消费进度是一样的,故需交由Broker统一管理

本地消息持久化很简单,就是遍历LocalFileOffsetStore中保存的消息队列的Offset,与传入的消息队列列表做匹配,最终将匹配消息队列的Offset持久化到user.home/.rocketmq_offsets/clientId/groupName/offsets.json;远程消息持久化会通过UPDATE_CONSUMER_OFFSET关联调用到Broker中ConsumerManageProcessorprocessRequest方法,从而调用ConsumerOffsetManagercommitOffset将offset更新到ConsumerOffsetManageroffsetTable缓存表中,这里很明显没有将offset直接持久化到磁盘,而是通过启动的ScheduleMessageService异步线程中注册的周期任务,默认每10s会对Broker上的Consumer Offset进行一次持久化到磁盘,默认持久化路径为user.home/store/config/consumerOffset.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
public class MQClientInstance {
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
}
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void persistConsumerOffset() {
try {
this.isRunning(); // 若不是RUNNING状态直接抛出异常
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void persistConsumerOffset() {
try {
this.makeSureStateOK(); // 若不是RUNNING状态直接抛出异常
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {}
}
}
public class LocalFileOffsetStore implements OffsetStore {
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty()) return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try { // 默认路径:user.home/.rocketmq_offsets/clientId/groupName/offsets.json
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {}
}
}
}
public class RemoteBrokerOffsetStore implements OffsetStore {
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty()) return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try { // 更新Consumer Offset到Broker,若Master关闭,则更新Slave
this.updateConsumeOffsetToBroker(mq, offset.get());
} catch (Exception e) {}
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) { // 将未被使用的MQ队列移除
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
}
}
}
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) { // 若找不到该队列所在的Broker,则再从NameServer拉取一次Topic路由信息,在找一次
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) { // 定时任务持久化时,默认为true,5s超时
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
}
public class MQClientAPIImpl {
public void updateConsumerOffsetOneway(final String addr, final UpdateConsumerOffsetRequestHeader requestHeader, final long timeoutMillis) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
}
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
public class ConsumerOffsetManager extends ConfigManager {
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
}
}
}
public class ScheduleMessageService extends ConfigManager {
public void start() { // 延迟消息服务的启动方法
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {//定时执行延迟消息处理任务
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() { //每隔10秒,将延迟消息持久化到硬盘中。
@Override public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
}
public abstract class ConfigManager {
public synchronized void persist() { // 将数据持久化到磁盘
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {}
}
}
}