RocketMQ基础

RocketMQ架构

Producer

消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

Producer启动后会随机选择NameServer集群中其中一个节点建立长连接定期从NameServer获取Topic路由信息,并判断当前订阅Topic存在哪些Broker轮询从队列列表中选择一个队列,并向提供Topic服务的队列所在的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Consumer

消息消费的角色,支持分布式集群方式部署。支持以PushPull拉两种模式对消息进行消费。同时支持集群方式广播方式消费,提供实时消息订阅机制

Consumer启动后会随机选择NameServer集群中其中一个节点建立长连接,定期从NameServer获取Topic路由信息,并判断当前订阅Topic存在哪些Broker,并向提供Topic服务的MasterSlave建立长连接,且定时向MasterSlave发送心跳。Consumer既可从Master订阅消息,也可从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量最大偏移量距离,判断是否读老消息产生读I/O,以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

NameServer

NameServer一个非常简单的Topic路由注册中心支持Broker动态注册与发现。通常也是集群方式部署各实例间不信息通讯Broker每台NameServer注册自己的路由信息,故每个NameServer实例上都保存一份完整的路由信息。若当某个NameServer因某种原因下线,Broker仍可向其它NameServer同步其路由信息,Producer和Consumer仍可动态感知Broker路由信息NameServer主要包括Broker管理路由信息管理两个功能:

  • Broker管理NameServer接受Broker集群的注册信息且保存作为路由信息基本数据提供心跳检测机制,检查Broker是否存活;
  • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer可知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer是一个几乎无状态节点,可集群部署节点之间无任何信息同步。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心

BrokerServer

Broker主要负责消息的存储投递查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求
  • Client Manager:负责管理Producer和Consumer客户端和维护ConsumerTopic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘查询功能
  • HA Service:高可用服务,提供Master BrokerSlave Broker之间的数据同步功能
  • Index Service:根据特定Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询

Broker分为MasterSlave,一个Master可对应多个Slave,一个Slave只能对应一个Master,Master与Slave对应关系通过指定相同的BrokerName不同BrokerId来定义,BrokerId0表示Master0表示Slave。Master可部署多个。每个Broker与NameServer集群中的所有节点建立长连接定时注册Topic信息到所有NameServer。 虽然支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息读负载。Broker启动后跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker如IP、端口等信息,以及存储所有Topic信息。注册成功后NameServer集群中就有TopicBroker映射关系

RocketMQ使用

基本样例

生产者发送消息有同步发送异步发送单向发送三种方式,单向发送使用sendOneway方法来发送消息,该方法无返回值无回调

1
2
3
4
5
6
7
8
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 20; i++) {
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
producer.shutdown();

同步发送使用send方法同步传递消息,消息会发给集群中的一个Broker节点

1
2
3
4
5
6
7
8
9
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 20; i++) {
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();

由于是异步发送,这里引入了CountDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();

消费者消费消息有两种模式:消费者主动去Broker上拉取消息的拉模式;消费者等待Broker把消息推送过来的推模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
if (pullResult.getMsgFoundList() != null) {
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
System.out.println("messageExt:" + messageExt);
}
}
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setNamesrvAddr("localhost:9876");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setNamesrvAddr("localhost:9876");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}

消费者推模式

1
2
3
4
5
6
7
8
9
10
11
12
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

顺序消息

发送者端默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同MessageQueue分区队列,消费者消费时也从多个MessageQueue上拉取消息,该情况下消息是不能保证顺序的。仅当一组有序的消息发送到同一个MessageQueue时,才能利用MessageQueue先进先出的特性保证这一组消息有序。而Broker中一个队列内的消息是可以保证有序的。

消费者端消费者会从多个消息队列取消息。虽然每个消息队列消息是有序的,但多个队列之间消息仍是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给Consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently消息监听器则不会锁队列,每次都是从多个Message中取一批数据,默认不超过32,因此也无法保证消息有序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = i;
for (int j = 0; j <= 5; j++) {
Message msg = new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId, ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("收到消息内容 " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();

广播消息

集群状态MessageModel.CLUSTERING下,每条消息只会被同一个消费者组中的一个实例消费到。而广播模式则是把消息模式设置为MessageModel.BROADCASTING,将给所有订阅对应主题的消费者发送消息,而不管消费者是不是同一个消费者组

1
2
3
4
5
6
7
8
9
10
11
12
13
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING); // 将消息模式设置为BROADCASTING
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

延迟消息

延迟时间的设置是在Message消息对象上设置一个延迟级别setDelayTimeLevel(3),开源版RocketMQ中,对延迟消息并不支持任意时间的延迟设定,而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。

1
2
3
4
5
6
7
8
9
10
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 分组名称
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延时队列
SendResult sendResult = producer.send(msg);
}
producer.shutdown();

批量消息

将多条消息合并成一个批量消息,一次发送出去,可减少网络IO提升吞吐量。批量消息的使用有一定限制,这些消息TopicwaitStoreMsgOK必须相同,且不能是延迟消息事务消息等。

1
2
3
4
5
6
7
8
9
10
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
producer.shutdown();

若批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。实际最大的限制是4194304字节约4MB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
producer.send(messages);
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ListSplitter implements Iterator<List<Message>> {
private final List<Message> messages;
private int sizeLimit = 1000 * 1000;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}

过滤消息

可使用Message的Tag属性来简单快速的过滤信息,TAG是RocketMQ中特有的一个消息属性,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。

1
2
3
4
5
6
7
8
9
10
11
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

一个消息只能有一个TAG,不能满足一些比较复杂的场景。 可使用SQL表达式来对消息进行过滤,但只有推模式的消费者可使用SQL过滤。拉模式是用不了的。RocketMQ只定义了一些基本语法来支持这个特性。也可很容易地扩展它。

  • 数值比较>>=<<=BETWEEN=
  • 字符比较=<>INIS NULLIS NOT NULL
  • 逻辑符号ANDORNOT
  • 数值123,3.1415字符'abc';必须用单引号包裹起来
  • 特殊常量NULL,布尔值TRUEFALSE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("a", String.valueOf(i)); // 自定义字段
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
// 消费者示例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

事务消息

事务消息是在分布式系统中保证最终一致性两阶段提交的消息实现,可保证本地事务执行消息发送两个操作原子性,事务消息只涉及到消息发送者,对消息消费者来说没有什么特别,即只保证了分布式事务的一半。事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,该事务监听器就是事务消息的关键控制器;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); // 回查次数
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10000"); // 回查时间
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
}

事务消息不支持延迟消息批量消息,为了避免单个消息被检查太多次而导致队列消息累积,回查次数由BrokerConfig.transactionCheckMax参数来配置默认15,可在broker.conf中覆盖,实际检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。该属性值大于transactionCheckMax丢弃,默认情况下同时打印错误日志,可通过重写AbstractTransactionCheckListener类来修改该行为。 该用户属性值按回查次数递增,也可在Producer中自行覆盖该属性

回查时间间隔BrokerConfig.transactionTimeOut参数来配置,默认6,可在broker.conf中修改,也可给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间

事务性消息可能不止一次被检查或消费;提交给用户的目标主题消息可能会失败,目前以日志的记录而定。其高可用性通过RocketMQ本身的高可用性机制来保证,若希望确保事务消息不丢失、且事务完整性得到保证,建议使用同步双重写入机制

事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询,MQ服务器能通过事务消息的生产者ID查询到消费者

事务消息机制在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC,该Topic对消费者不可见,然后执行本地事务执行commit提交,则Broker会将投递到RMQ_SYS_TRANS_HALF_TOPIC中的消息投递到用户指定真正Topic,然后再投递一个表示删除的消息到RMQ_SYS_TRANS_OP_HALF_TOPIC中,表示当前事务已完成,若本地事务rollback 回滚,则没有投递到真实Topic的过程,只需要投递表示删除的消息到RMQ_SYS_TRANS_OP_HALF_TOPIC,若Commit提交或Rollback回滚失败,Broker默认每6s中回查调用checkLocalTransaction一次,在该回查方法中再次回滚会提交事务,默认最多15次。

ACL权限控制

ACL权限控制主要为RocketMQ提供Topic资源级别的用户访问控制,可在Client客户端通过RPCHook注入AccessKeySecretKey签名;将对应的权限控制属性,包括Topic访问权限IP白名单AccessKeySecretKey签名等,设置在$ROCKETMQ_HOME/conf/plain_acl.yml配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "1234567";
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
}

Broker具体配置信息可参见源码包下docs/cn/acl/user_guide.md,在broker.conf中通过aclEnable=true打开acl的标志。然后就可以用plain_acl.yml来进行权限配置了。且该配置文件是热加载的,修改后不用重启Broker服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
globalWhiteRemoteAddresses: # 全局白名单,不受ACL控制,通常需要将主从架构中所有节点加进来
- 10.10.103.*
- 192.168.0.*

accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY # 默认Topic访问策略是拒绝
defaultGroupPerm: SUB # 默认Group访问策略是只允许订阅
topicPerms:
- topicA=DENY # topicA拒绝
- topicB=PUB|SUB # topicB允许发布和订阅消息
- topicC=SUB # topicC只允许订阅
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
# 第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true

SpringBoot集成

SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,故需特别注意版本。通过内置的RocketMQTemplate来与RocketMQ交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
1
2
3
4
# NameServer地址
rocketmq.name-server=localhost:9876
# 默认的消息生产者组
rocketmq.producer.group=springBootGroup

SpringBoot集成RocketMQ,消费者部分核心功能都集成到@RocketMQMessageListener注解。消息过滤可以由里面的selectorType属性和selectorExpression来定制,由consumeMode来有序消费还是并发消费等;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (String tag : tags) {
String destination = topic + ":" + tag;
this.rocketMQTemplate.convertAndSend(destination, msg);
}
}
}
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", messageModel = MessageModel.CLUSTERING, topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY, selectorExpression = "TagA")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message1 : " + message);
}
}

对于事务消息需要添加一个事务消息监听器,对于消息TAG的使用,需要通过将TAG拼接到Topic后面。SpringBoot依赖中的Message对象和RocketMQ中的Message对象是两个不同的对象,SpringBoot中的Message中就没有TAG属性,Tag属性被移到了发送目标中,以Topic:Tag的方式指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@ExtRocketMQTemplateConfiguration
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId, msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = " + msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);
String tags = message.getTags();
if (StringUtils.contains(tags, "TagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
// 这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = " + originalMessage);
// 获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();
if (StringUtils.contains(tags, "TagC")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagD")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate extRocketMQTemplate;
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//尝试在Header中加入一些自定义的属性。
Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, "TransID_" + i)
//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
.setHeader(RocketMQHeaders.TAGS, tags[i % tags.length])
//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
.setHeader("MyProp", "MyProp_" + i)
.build();
String destination = topic + ":" + tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
SendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}