RabbitMQ基础

MQ的优势

系统的耦合性越高,容错性就越低,可维护性就越低,可使用MQ对应用解耦提升容错性和可维护性

异步提速,提速用户体验和系统吞吐量

削峰填谷,可以提高系统稳定性

MQ劣势

系统引入的外部依赖越多,系统的稳定性就越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ高可用至关重要;系统复杂度提高。

基本概念

RabbitMQ是基于AMQP网络协议即Advanced Message Queuing Protocol高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息。

采用Erlang语言开发,其基础架构图如下:

Broker接收和分发消息的应用,RabbitMQ Server就是Message Broker

Virtual host:出于多租户安全因素设计,把AMQP基本组件划分到一个虚拟的分组中,当多个不同用户使用同一个RabbitMQ server提供的服务时,可划分出多个vhost,每个用户在自己的vhost创建exchange、queue等

Connectionpublisherconsumerbroker之间的TCP连接

Channel:在Connection内部建立的逻辑连接,若应用程序支持多线程,通常每个Thread创建单独的Channel进行通讯,AMQP Method包含了Channel Id帮助客户端和Message Broker识别Channel,故Channel之间是完全隔离的。若每一次访问RabbitMQ都建立一个Connection,在消息量大时建立TCP Connection开销将巨大,效率也较低。Channel作为轻量级的 Connection极大减少了操作系统建立TCP Connection开销。

Exchange:Message到达Broker的第一站,接收消息然后根据分发规则,匹配查询表中的Routing Key,分发消息到具体的Queue中,只具备消息转发不具备消息存储能力,若没有任何队列与Exchange绑定没有符合路由规则的队列消息将丢失。Exchange有常见以下3种类型:

  • Fanout广播:将消息交给所有绑定到交换机的队列
  • Direct定向:把消息交给符合指定Routing Key的队列
  • Topic通配符:把消息交给符合Routing Pattern路由模式的队列

Queue:消息最终被送到这里等待Consumer消费

BindingExchangeQueue之间的虚拟连接,Binding中可包含Routing Key。Binding信息被保存到Exchange中的查询表中,作为Message分发依据。

RabbitMQ工作模式

RabbitMQ提供了简单模式Work Queues工作队列Publish/Subscribe发布订阅模式Routing路由模式Topics主题模式RPC远程调用模式6种工作模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("eleven");
connectionFactory.setPassword("eleven");
connectionFactory.setVirtualHost("eleven");
}

public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
简单模式

一个生产者、一个消费者,不需要设置交换机,使用默认的交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection(); // 获取TCP长连接
Channel channel = connection.createChannel();// 创建通信“通道”,相当于TCP中的虚拟连接
// 声明并创建一个队列,如果队列已存在,则使用这个队列
// 五个参数分别为:队列名称ID、是否持久化false对应不持久化数据,MQ停掉数据就会丢失、是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
// 是否自动删除false代表连接停掉后不自动删除掉这个队列、其他额外的参数这里设置为null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
String message = "test message";
// 四个参数:exchange交换机,暂时用不到这里设置为空字符串、队列名称、额外的设置属性、最后一个参数是要传递的消息字节数组
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());
channel.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException {
Connection conn = RabbitUtils.getConnection(); // 获取TCP长连接
Channel channel = conn.createChannel();// 创建通信“通道”,相当于TCP中的虚拟连接
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
// 从MQ服务器中获取数据,创建一个消息消费者
//三个参数:队列名、是否自动确认收到消息,false代表手动编程来确认消息MQ的推荐做法、 传入DefaultConsumer的实现类
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
}
}
public class Reciver extends DefaultConsumer {
private Channel channel;
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:" + message + ",消息的TagId:" + envelope.getDeliveryTag());
// false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
Work Queues

工作队列模式相对于简单模式,可多个消费端共同消费同一个队列消息,且对于同一个消息消费者之间是竞争关系,对于任务过重或任务较多情况使用工作队列可提高任务处理速度,比如短信通知服务、邮件通知服务等。不用定义交换机,生产者是面向队列发送消息,底层使用默认交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for (int i = 1; i <= 100; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
// 若不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
// basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的
channel.basicQos(1); // 处理完一个取一个,处理速度快的机器将多处理消息,若不设置则会平均分配
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
channel.basicAck(envelope.getDeliveryTag(), false); // false只确认签收当前的消息
}
});
}
}
public class SMSSender2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
// 若不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
// basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的
channel.basicQos(1); // 处理完一个取一个,处理速度快的机器将多处理消息,若不设置则会平均分配
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
channel.basicAck(envelope.getDeliveryTag(), false); // false只确认签收当前的消息
}
});
}
}
发布订阅模式

相对于前两个模式增加了交换机,且须将交换机设置为fanout类型,且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列,一个消息可被多个消费者都收到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class WeatherBureau { // 这里使用到了交换机,需要手动创建交换机否则会报错,且指定交换机类型为fanout
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
//第一个参数交换机名字 其他参数和之前的一样
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
channel.close();
connection.close();
}
}
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); // 获取TCP长连接
final Channel channel = connection.createChannel(); //获取虚拟连接
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); // 声明队列信息
// queueBind用于将队列与交换机绑定,参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); //获取TCP长连接
final Channel channel = connection.createChannel(); //获取虚拟连接
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //声明队列信息
// queueBind用于将队列与交换机绑定,参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
路由模式

须将交换机设置为direct类型队列与交换机的绑定不能是任意绑定需要指定一个Routing Key即路由key,消息发送方在向Exchange发消息时,也必须指定消息Routing Key;Exchange不再把消息交给每一个绑定的队列,而是根据消息Routing Key进行判断,只有队列Routing key与消息Routing Key完全一致,才会接收到消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class WeatherBureau { // 要手动创建交换机否则会报错,且指定交换机类型为direct,否则路由将失效
public static void main(String[] args) throws Exception {
Map<String, String> area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");

area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字,第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, me.getKey(), null, me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
// queueBind用于将队列与交换机绑定,参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
// 指定队列与交换机以及routing key之间的关系
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
主题模式

须将交换机设置为topic类型,Topic类型与Direct相比,都可根据Routing Key把消息路由到不同队列。Topic类型Exchange可让队列在绑定Routing key时使用通配符;Routing Key一般都是一个或多个单词组成,多个单词之间以.分割,如:item.insert主题模式可实现发布订阅模式路由模式的功能,只是主题模式在配置Routing Key时刻使用通配符,显得更加灵活;

通配符规则#匹配一个或多个词*匹配一个任意词,如:item.#能够匹配item.insert.abc或者item.insertitem.*只能匹配item.insertitem.update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class WeatherBureau { // 要手动创建交换机否则会报错,且指定交换机类型为topic,否则路由将失效
public static void main(String[] args) throws Exception {
Map<String, String> area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
for (Map.Entry<String, String> entry : area.entrySet()) {
//第一个参数交换机名字,第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, entry.getKey(), null, entry.getValue().getBytes());
}
channel.close();
connection.close();
}
}
public class BiaDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
// queueBind用于将队列与交换机绑定,参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hebei.shijiazhuang.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); // 获取TCP长连接
final Channel channel = connection.createChannel(); //获取虚拟连接
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //声明队列信息
// 指定队列与交换机以及routing key之间的关系
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

消息确认机制

RabbitMQ提供了监听器来接收消息投递状态,消息确认涉及ConfirmReturn两种状态,RabbitMQ在传递消息的过程中充当代理人的角色,生产者可以通过监听器来知道消息是否被正确投递到Broker。Confirm代表生产者将消息送到Broker时产生的状态,后续会出现ack和nack两种情况:

  • ack:代表Broker已经将数据接收
  • nack:代表Broker拒收消息,可能是队列已满限流IO异常

Return代表消息被Broker正常接收ack后,但Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者;ConfirmReturn两种状态只代表生成者与Broker之间消息投递情况与消费者是否接收、确认消息无关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map<String, String> area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//开启confirm监听模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
//第二个参数代表接收的数据是否为批量接收,一般用不到。
System.out.println("消息已被Broker接收,Tag:" + l);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息已被Broker拒收,Tag:" + l);
}
});
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey());
System.err.println("Return主题:" + new String(r.getBody()));
System.err.println("===========================");
}
});
for (Map.Entry<String, String> entry : area.entrySet()) {
// 第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, entry.getKey(), true, null, entry.getValue().getBytes());
}
//如果关闭则无法进行监听,因此此处不需要关闭
// channel.close();
// connection.close();
}
}
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定,参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("腾讯天气收到气象信息:" + new String(body));
// channel.basicAck(envelope.getDeliveryTag() , false);
// 第二个参数是否应用于多消息,第三个参数是否重新放回队列
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
});
}
}