Consumer启动源码

消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。消费模式有推模式拉模式推模式是由拉模式封装组成

Push模式

1
2
3
4
5
6
7
8
9
10
11
12
13
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING); // 将消息模式设置为BROADCASTING
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

很明显默认是使用AllocateMessageQueueAveragely平均分配策略,将所有MessageQueue平均分给每一个消费者,通过subscribe方法设置订阅的Topic和Tag表达式,FilterAPI.buildSubscriptionData方法将解析Tag表达式封装成SubscriptionData,后缓存到负载均衡器RebalanceImplsubscriptionInner中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {}
}
}
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long pullTimeDelayMillsWhenException = 3000;
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
private boolean consumeOrderly = false;
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
//K2 客户端进行实际消息消费的service组件。
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;

public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException(); // 默认1s
}
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) { // 此时mQClientFactory是为空的
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}

首先通过checkConfig方法检查消费者DefaultMQPushConsumer的各种配置是否合法,然后通过copySubscription将设置到DefaultMQPushConsumer中的订阅Topic以及Tag表达式信息解析拷贝到RebalanceImpl中,且若是集群模式会创建一个重试Topic并添加到RebalanceImpl中;

然后创建MQClientInstance核心对象,该类是生产者和消费者复用的,若生产者和消费者位于同一服务则该类只会被实例化一次,在该类构造方法中主要完成以下任务:

  • 实例化NettyClientConfig并设置一些参数
  • 实例化用于处理请求分发的NettyRequestProcessor的具体实现类ClientRemotingProcessor,通过processRequest方法完成请求的分发
  • 然后通过构造方法实例化MQClientAPIImpl
    • 在该构造方法中首先实例化NettyRemotingClientNettyRemotingClient复用Broker用来管理BrokerNameServerNetty客户端的类,用于管理ProducerBrokerNameServerNetty客户端,且同样在构造方法中并不会直接创建连接,客户端和服务端的连接是在调用invokeSyncinvokeAsyncinvokeOneway等方法时通过NettyRemotingClientcreateChannel方法来完成的
    • 通过registerProcessorRequestCodeClientRemotingProcessor绑定
  • NameServer地址设置到NettyRemotingClient
  • 实例化用于执行创建Topic等命令的MQAdminImpl
  • 实例化拉模式消费者服务线程PullMessageService
  • 实例化客户端负载均衡的RebalanceService,默认是20s执行一次,Broker监听到Consumer数量变动,也通知Consumer进行Rebalance
  • 实例化用于统计消费者消费Topic的RTTPS等消费指标的ConsumerStatsManager

然后将消费者组消费者模式消费者负载均衡策略等设置到RebalanceImpl中,然后实例化PullAPIWrapper,然后根据消费模式实例化OffsetStoreoffset存储策略广播模式通过LocalFileOffsetStore本地存储offset集群模式通过RemoteBrokerOffsetStore将offset存储到Broker,然后加载offset,LocalFileOffsetStore则直接从本地磁盘加载offset,RemoteBrokerOffsetStoreload方法是空实现;

然后根据是否为顺序消息实例化ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService,然后启动该ConsumeMessageService,然后将DefaultMQPushConsumerImpl实例注册到MQConsumerInnerconsumerTable中,然后启动MQClientInstance,注意若生产者或其他消费者已启动过则不会重复启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) { // 该状态初始时默认为CREATE_JUST
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); // 检查配置,ConsumerGroup是否合法、订阅是否为null、线程数是否合法等
this.copySubscription(); // 将设置到DefaultMQPushConsumer中的订阅信息解析拷贝到RebalanceImpl中
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID(); // 修改实例id为当前进程PID
}
//K2 客户端创建工厂,这个是核心对象,与Product复用的同一个类
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 设置消费者组,顺序消息关键
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 设置消费者模式:默认CLUSTERING模式
// 集群模式下消费者策略,默认AllocateMessageQueueAveragely平均分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) { // 默认为空
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 广播模式通过LocalFileOffsetStore本地存储offset
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING: // 集群模式通过RemoteBrokerOffsetStore在Broker存储offset
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// LocalFileOffsetStore则直接从本地磁盘加载offset,RemoteBrokerOffsetStore这里是空实现
this.offsetStore.load();
// 根据客户端配置实例化不同的consumeMessageService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { // 顺序消息
this.consumeOrderly = true; // 在doRebalance方法中用到
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { // 非顺序消息
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 注册本地的消费者组缓存,即注册到MQConsumerInner的consumerTable中
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) { // 若注册失败抛出异常
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mQClientFactory.start();
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
// 遍历当前消费在订阅的Topic,通过MQClientInstance的updateTopicRouteInfoFromNameServer(topic, false, null)方法从NameServer中同步最新的Topic和Broker信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
}
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId(); // 构建clientId:ClientIP@InstanceName@unitName
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
}
}
return instance;
}
}
public class MQClientInstance {
//同一个消费者可以属于多个消费者组,所以在启动客户端实例时,会用一个Map来保存当前实例的消费者组。这个map的value就是当前实例。
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
//Topic的路由信息
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ClientRemotingProcessor clientRemotingProcessor;
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
private ServiceState serviceState = ServiceState.CREATE_JUST;
private Random random = new Random();
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
}
}

通过updateTopicSubscribeInfoWhenSubscriptionChanged方法遍历当前消费在订阅的Topic,然后通过MQClientInstanceupdateTopicRouteInfoFromNameServer(topic, false, null)方法从NameServer中同步最新的TopicBroker信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
}
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
}

checkClientInBroker主要任务是到Broker上解析ExpressionTypeSQL92的SQL表达式为SqlFilter,如果是Tag类型则直接跳过,主要是判断Broker是否支持该SQL表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class MQClientInstance {
public void checkClientInBroker() throws MQClientException {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) { // 变量当前实例的消费者列表
Entry<String, MQConsumerInner> entry = it.next();
Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
if (subscriptionInner == null || subscriptionInner.isEmpty()) {
return; // 若订阅者信息为空这退出
}
for (SubscriptionData subscriptionData : subscriptionInner) {
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
continue; // 若ExpressionType为TAG
}
// 从该Topic对应的Broker列表中随机选择一个Broker地址
String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
if (addr != null) {
try { // 调用Broker接口
this.getMQClientAPIImpl().checkClientInBroker(addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000);
} catch (Exception e) {
if (e instanceof MQClientException) {
throw (MQClientException) e;
} else {
throw new MQClientException(e);
}
}
}
}
}
}
public String findBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
if (!brokers.isEmpty()) {
int index = random.nextInt(brokers.size());
BrokerData bd = brokers.get(index % brokers.size());
return bd.selectBrokerAddr();
}
}
return null;
}
}
public class MQClientAPIImpl {
public void checkClientInBroker(final String brokerAddr, final String consumerGroup, final String clientId, final SubscriptionData subscriptionData, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
CheckClientRequestBody requestBody = new CheckClientRequestBody();
requestBody.setClientId(clientId);
requestBody.setGroup(consumerGroup);
requestBody.setSubscriptionData(subscriptionData);
request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
assert response != null;
if (ResponseCode.SUCCESS != response.getCode()) {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
}
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(), CheckClientRequestBody.class);
if (requestBody != null && requestBody.getSubscriptionData() != null) {
SubscriptionData subscriptionData = requestBody.getSubscriptionData();
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response; // 若类型为Tag则直接返回成功
}
if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
try { // 若是SqlFilter,则解析SQL过滤表达式
FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
} catch (Exception e) {
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}

通过调用MQClientInstancesendHeartbeatToAllBrokerWithLock方法向该实例注册的所有Broker发送心跳信息,且在MQClientInstance的start方法中启动了一个周期任务每30s会执行执行一次该心跳任务;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker(); // 向所有Broker发送心跳
this.uploadFilterClassSource();
} catch (final Exception e) {} finally {
this.lockHeartbeat.unlock();
}
}
}
private void sendHeartbeatToAllBroker() {
// ClientID以及当前Client的上所有的消费者和生产者列表信息,组装成心跳信息
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
return; // 若当前节点既没有生产者也没有消费者,则不发送心跳
}
if (!this.brokerAddrTable.isEmpty()) { // 若Broker列表为空也不发送心跳
long times = this.sendHeartbeatTimesTotal.getAndIncrement(); // 发送心跳总次数
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) { // 只向Master节点发送心跳,跳过从节点
if (id != MixAll.MASTER_ID) continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version); // 更新Broker版本信息
} catch (Exception e) {}
}
}
}
}
}
}
}

通过MQClientInstancerebalanceImmediately方法完成客户端负载均衡,不仅仅是在服务启动时会执行客户端负载均衡,在MQClientInstancestart方法中会启动PullMessageServiceRebalanceService线程,RebalanceService会每20s执行一次该负载均衡方法,且在客户端向Broker发送心跳信息时,Broker会检测Consumer是否发生变化,若发生变化也会主动通知调用每个Consumer的ClientRemotingProcessornotifyConsumerIdsChanged方法唤醒负载均衡线程重置客户端负载均衡;

针对当前消费者所属的每一个消费者组,最终调用PULL模式和PUSH模式的具体实现,但最终都是RebalanceImpldoRebalance方法,只是PULL模式isOrder传入的为falsePUSH模式isOrder传入的是真实的是否是顺序消息;真正进行负载都是根据Topic来进行的,故会遍历当前消费者订阅的Topic执行负载均衡重置

对于广播模式每个消费者都要消费,只需要更新负载信息,不需要进行负载,故在调用updateProcessQueueTableInRebalance方法时直接传入获取的当前Topic的所有队列,但集群模式会随机选择当前Topic所对应的Broker列表中的一个,获取当前消费者组对应的消费者ClientID列表,为了保证消费者负载策略相对稳定会对消息队列ClientID列表排序,然后再对排序好的数据执行对应的负载均衡策略完成负载均衡的重置,获取到新的订阅的消息队列列表,然后调用updateProcessQueueTableInRebalance方法;

通过updateProcessQueueTableInRebalance方法遍历MessageQueueProcessQueue快照映射列表,若映射表中的数据不包含在最新的订阅的消息队列中了,这将ProcessQueue的dropped状态置为true,通过removeUnnecessaryMessageQueue方法保存当前队列消费的offset进度,该方法若是广播模式实际什么都没有做直接返回true集群模式会将offset上报给Broker并移除本地该消费队列的内存offset,若是顺序消息还会解锁队列,若成功则将processQueueTable映射表中的该队列移除;若包含在重置后的订阅列表中,但最后拉取时间大于120s,且若是拉取模式同样会执行removeUnnecessaryMessageQueue的逻辑。

然后若不在processQueueTable映射表中,需要创建映射关系,在ProcessQueue中保存的是拉取到本地且还未被消费的消息数据,还会创建一个PullRequest,且若是PUSH模式,将PullRequest列表添加到PullMessageService的阻塞队列中;通过该服务的run方法中调用DefaultMQPushConsumerImplpullMessage拉取消息;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
public class MQClientInstance {
private final RebalanceService rebalanceService;
public void rebalanceImmediately() {
this.rebalanceService.wakeup(); // 唤醒RebalanceService线程
}
public void doRebalance() { // 客户端负载均衡 针对当前消费者所属的每一个消费者组
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try { // 分为PULL模式和PUSH模式实现,但最终都是RebalanceImpl的doRebalance方法
impl.doRebalance();
} catch (Throwable e) {}
}
}
}
}
public class RebalanceService extends ServiceThread {
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance(); // 调用MQClientInstance的doRebalance方法
}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
}
}
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) { // 遍历当前消费者订阅的Topic
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {//K2 客户端负载:真正进行负载都是根据Topic主题来进行的。
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {}
}
}
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 广播模式,不需要进行负载,每个消费者都要消费,只需要更新负载信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 遍历MessageQueue对应的ProcessQueue消息快照,对于新增的消息队列做新增快照,对于移除的消息队列保存当前消费的offset
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); // 关键代码
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
}
}
break;
}
case CLUSTERING: { // 客户端负载:集群模式负载方法
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 获取订阅的主题全部的队列
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 从broker获取全部的消费者客户端id
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll); // 排序后才能保证消费者负载策略相对稳定
Collections.sort(cidAll); // 排序后才能保证消费者负载策略相对稳定
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; // MessageQueue的负载策略,有五种实现类
List<MessageQueue> allocateResult = null;
try { // 按负载策略进行分配,返回当前消费者实际订阅的MessageQueue集合,默认AllocateMessageQueueAveragely
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 同步更新处理队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); // 关键代码
if (changed) {
// 如果处理队列被更新了调用此方法,交给子类去处理。
// 在推模型中,此步骤是动态调整拉取队列个数和size的阀值并通过心跳通知broker自己触发了下一个版本
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) { // 重置后当前mq不包含在重置后的订阅列表中
pq.setDropped(true); // 将dropped状态置为true
// 移除之前的消费队列做收尾工作,如上报offset,平时上报offset是定时的,push模式下顺序消息默认返回false,其他情况返回true
if (this.removeUnnecessaryMessageQueue(mq, pq)) { // 广播模式,直接返回true,实际什么都没有做
it.remove(); // 集群模式下,PULL模式下会将offset上报到Broker,PUSH模式
changed = true; // 说明顺序消息不会进入该逻辑
}
} else if (pq.isPullExpired()) { // 如果最后拉取时间大于120s
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break; // 因为拉模型的是手动处理的,对于单队列拉取超时的不用做任何处理
case CONSUME_PASSIVELY: // 对于单队列拉取超时,那么只需要处理推模型的即可。因为拉模型的是手动处理的
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) { // 遍历负载均衡后的消息队列列表
if (!this.processQueueTable.containsKey(mq)) { // 若不包含在队列消费快照列表中,说明未拉取过消息,若已经包含则不用管
if (isOrder && !this.lock(mq)) {
continue; // 若是顺序消息,且lock失败,则跳过
}
this.removeDirtyOffset(mq); // 广播模式是空实现,集群模式则将mq从offsetTable中移除
ProcessQueue pq = new ProcessQueue(); // 创建一个队列消费快照
long nextOffset = this.computePullFromWhere(mq); // 获取最新消费进度
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) { // 说明之前存在
} else { // 若之前不存在,则给当前消息队列创建一个PullRequest,并添加到pullRequestList中
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
}
}
this.dispatchPullRequest(pullRequestList); // 只有PUSH模式有实现,将PullRequest列表添加到PullMessageService的阻塞队列中
return changed;
}
}
public class RebalanceLitePullImpl extends RebalanceImpl {
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.litePullConsumerImpl.getOffsetStore().persist(mq);
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
}
public class RebalancePushImpl extends RebalanceImpl {
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
// 若广播模式实现为空,集群模式RemoteBrokerOffsetStore调用updateConsumeOffsetToBroker方法将当前mq的消费最新进度offset同步更新到Broker
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
// 若广播模式实现为空,集群模式从RemoteBrokerOffsetStore的offsetTable中移除该mq,在本地把这个消费队列的内存offset维护移除
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try { // 若是顺序消息且是集群模式
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
pq.getLockConsume().unlock();
}
} else {
pq.incTryUnlockTimes();
}
} catch (Exception e) {}
return false;
}
return true;
}
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
}
public class PullMessageService extends ServiceThread {
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {}
}
public void run() {
while (!this.isStopped()) {
try {//拉取消息的请求队列
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest); //处理请求
} catch (InterruptedException ignored) {
} catch (Exception e) {}
}
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//K2 推模式的消费者最终还是会使用拉消息的方式
impl.pullMessage(pullRequest);
}
}
}

若负载均衡后队列发生改变,若是PUSH式,若设置了Topic级别的拉取阈值,会重置一些消息拉取阈值,且发送心跳通知Broker;若是PULL模式,会更新AssignedMessageQueue中订阅的MessageQueue,然后对新加入的队列开始拉取消息任务PullTaskImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class RebalancePushImpl extends RebalanceImpl {
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
long newVersion = System.currentTimeMillis();
subscriptionData.setSubVersion(newVersion); // 创建一个新版本
int currentQueueCount = this.processQueueTable.size(); // 当前MessageQueue的数量
if (currentQueueCount != 0) { // 若MessageQueue的数量不为0,重新计算限制
// 获取Topic级别的拉取阈值,默认-1表示没有限制
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) { // 若设置了Topic级别的拉取阈值
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount); // 重新计算拉取阈值
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal); // 设置拉取阈值
}
// 获取Topic级别的消息大小阈值,和拉取阈值时一个意思
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
// 发送心跳通知broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
}
public class RebalanceLitePullImpl extends RebalanceImpl {
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {}
}
}
}
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING: // 广播模式
// 更新AssignedMessageQueue中订阅的MessageQueue,这里传入的是全部的MessageQueue
updateAssignedMessageQueue(topic, mqAll);
updatePullTask(topic, mqAll); // 开始拉取消息任务
break;
case CLUSTERING:
// 更新AssignedMessageQueue中订阅的MessageQueue,这里传入的是订阅的MessageQueue
updateAssignedMessageQueue(topic, mqDivided);
updatePullTask(topic, mqDivided); // 开始拉取消息任务
break;
default:
break;
}
}
}
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
}
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) { // 移除没有被分配MessageQueue的pullTaskImpl
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!mqNewSet.contains(next.getKey())) {
// 这里很关键,对变更的消息队列,这只设置后PullTaskImpl任务中就不会再拉取该队列的消息了
next.getValue().setCancelled(true);
it.remove();
}
}
}
startPullTask(mqNewSet); // 对新加入的队列,开始拉取任务
}
private void startPullTask(Collection<MessageQueue> mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) { // 若不存在消息拉取任务,则创建一个消息拉取任务,并执行
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
}
public class AssignedMessageQueue {
private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove(); // 移除没有被分配到的processQueue
}
}
}
addAssignedMessageQueue(assigned); // 保存新的messageQueue
}
}
private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) { // 如果是新增的messageQueue
MessageQueueState messageQueueState;
// 新建或者使用以前的ProcessQueue,新建一个MessageQueueState然后保存
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
messageQueueState = new MessageQueueState(messageQueue, processQueue);
}
this.assignedMessageQueueState.put(messageQueue, messageQueueState);
}
}
}
}

PULL模式

通过DefaultLitePullConsumerImplsubscribe方法订阅Topic和设置Tag表达式,该方法中会将subscriptionType设置为SubscriptionType.SUBSCRIBE,然后设置监听器MessageQueueListenerImpl,该监听器中只有一个很重要的方法messageQueueChanged,然后通过start方法启动该消费者,与PUSH模式大部分地方相似,不同的是这里不需要设置MessageListener,更没有MessageListenerOrderlyMessageListenerConcurrently,以及对应的ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService服务;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
}
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || topic.equals("")) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
if (serviceState == ServiceState.RUNNING) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
initMQClientFactory();
initRebalanceImpl();
initPullAPIWrapper();
initOffsetStore();
mQClientFactory.start();
startScheduleTask();
this.serviceState = ServiceState.RUNNING;
operateAfterRunning();
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
}
private void initRebalanceImpl() {
this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
}
private void initPullAPIWrapper() {
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
}
private void initOffsetStore() throws MQClientException {
if (this.defaultLitePullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
} else {
switch (this.defaultLitePullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
}
private void startScheduleTask() {
scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
}
private void operateAfterRunning() throws MQClientException { // subscriptionType默认为SubscriptionType.SUBSCRIBE
// If subscribe function invoke before start function, then update topic subscribe info after initialization.
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
// If assign function invoke before start function, then update pull task after initialization.
if (subscriptionType == SubscriptionType.ASSIGN) {
updateAssignPullTask(assignedMessageQueue.messageQueues());
}
for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
messageQueuesForTopic.put(topic, messageQueues);
}
this.mQClientFactory.checkClientInBroker();
}
}