Kafka基础

Kafka基础

Kafkascala语言编写的支持partition分区replica多副本基于Zookeeper协调分布式消息系统,可实时处理大量数据以满足各种需求场景,如基于hadoop批处理系统、低延迟实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等。

名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker组成Kafka集群
Topic Kafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需指定一个Topic
Producer 消息生产者,向Broker发送消息的客户端,通过TCP协议来完成通信
Consumer 消息消费者,从Broker读取消息的客户端,通过TCP协议来完成通信
Consumer Group 每个Consumer属于一个特定Consumer Group,一条消息可被多个不同Consumer Group消费
一个Consumer Group中只能有一个Consumer能消费该消息
Partition 物理上的概念,一个Topic可分为多个Partition,每个Partition内部消息是有序的

使用场景

  • 日志收集:可用Kafka收集各种服务日志,通过kafka以统一接口服务方式开放给各种Consumer,如Hhadoop、Hbase、Solr等。
  • 消息系统:解耦生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录Web用户或App用户的各种活动,如浏览网页、搜索、点击等活动,被各个服务器发布到kafka的Topic中,然后订阅者通过订阅这些Topic来做实时监控分析,或装载到Hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用数据,生产各种操作的集中反馈,如报警和报告。

Kafka核心配置

Kafka核心配置在config/server.properties配置文件中。

1
2
3
4
broker.id=0	# broker.id属性在kafka集群中必须要是唯一
listeners=PLAINTEXT://localhost:9092 # kafka部署的机器ip和提供服务的端口号
log.dir=/usr/local/data/kafka-logs # kafka的消息存储文件
zookeeper.connect=192.168.65.60:2181 # kafka连接zookeeper的地址,若是集群则用逗号分割
Property Default Description
broker.id 0 每个Broker都可用一个唯一非负整数id进行标识
log.dirs /tmp/kafka-logs 存放数据的路径,该路径并不唯一,可设置多个路径之间
逗号分隔,创建新partition时选择包含最少partitions
路径下创建
listeners PLAINTEXT://192.168.65.60:9092 Server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connect localhost:2181 Kafka连接Zookeeper的地址,若是集群逗号分隔
log.retention.hours 168 每个日志文件的保存时间。默认数据保存时间对所有topic
都一样
num.partitions 1 创建Topic默认分区数
default.replication.factor 1 自动创建Topic默认副本数量,建议设置为大于等于2
min.insync.replicas 1 写数据到repica数量达到设定值才表示Producer发送消息
成功
,若Producer设置acks-1,则每个repica写数据
都必须成功
delete.topic.enable false 是否允许删除主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bin/kafka-server-start.sh config/server.properties	# 启动kafka
bin/kafka-server-stop.sh # 停止kafka
# 创建名字为test的Topic,该topic只有一个partition,且备份因子为1
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test
# 查看kafka中目前存在的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
# 发送消息到kafka,若是集群--broker-list参数用逗号隔开
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 消费kafka集群最新消息,若是集群--bootstrap-server参数用逗号隔开
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
# 多主题消费kafka集群消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"
# 通过--from-beginning从开始读取kafka集群消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
# 单播消费,一条消息只能被某一个消费者消费,让所有消费者在同一个消费组里即可
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
# 多播消费,同一条消息只能被同一个消费组下的某一个消费者消费的特性
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic test
# 查看消费组名
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费组的消费偏移量,current-offset当前消费组的已消费偏移量,log-end-offset主题对应分区消息结束偏移量,lag当前消费组未消费消息数
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
# 查看下topic情况
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
# 增加topic的分区数量,目前kafka不支持减少分区
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic test

同类消息发送到同一个Topic下面,每个Topic下面可有多个分区Partition日志文件Partition是一个有序的message序列,这些message按顺序添加到commit log文件中。每个Partition中消息都有一个唯一编号offset,用来唯一标识某个分区中的message。 每个Partition都对应一个commit log文件同一个Partition中的messageoffset都是唯一的,但不同Partitionmessageoffset可能相同。

每个Partition分区中都有一个Leader副本节点一个或多个Replicas副本以及一个Isr集合,PartitionLeader副本节点负责给定Partition的所有读写请求,Replicas表示某个Partition在哪几个Broker上存在备份,不管该节点是不是Leader,甚至该节点挂了也会列出Isr集合是Replicas的一个子集,只列出存活的备份节点,且已同步备份了该Partition的节点。

Kafka一般不会删除消息不管是否被消费。只会根据配置的日志保留时间log.retention.hours确认消息多久被删除,默认保留最近一周的消息。Kafka性能与保留消息数据量大小没有关系

每个Consumer是基于commit log中消费进度即offset来进行工作的,消费offset由Consumer来维护,一般按照顺序逐条消费commit log中的消息可通过指定offset来重复消费某些消息跳过某些消息。意味Consumer对集群影响非常小,添加或减少Consumer对于集群或其他Consumer没有影响,因为每个Consumer维护各自的消费offset

一个Topic代表逻辑上的一个业务数据集,对于大型网站来说,后端数据都是海量的,消息可能非常巨量,若把这么多数据都放在一台机器上可能会有容量限制问题,可在Topic内部划分多个Partition分片存储数据不同Partition可位于不同机器上,每台机器上都运行一个Kafka的Broker进程。

分片存储的好处提高并行度,且commit log文件会受到所在机器的文件系统大小的限制,分区后可将不同分区放在不同机器上,相当于对数据做分布式存储,理论上一个Topic可处理任意数量数据。

Kafka集群

Kafka将很多集群关键信息记录在Zookeeper中,保证自己的无状态,从而在水平扩容时非常方便commit logPartitions分布在Kafka集群中不同Broker上,每个Broker上Partition分区的副本可请求备份其他Broker上Partition上副本的数据,Kafka集群支持配置一个Partition备份数量。每个Partition都有一个Broker上的副本起到Leader的作用0多个其他的Broker副本作为Follwers作用。作为Leader的副本处理所有针对该Partition的读写请求,作为Followers的副本被动复制作为Leader的副本的结果,不提供读写,主要是为了保证多副本数据与消费的一致性。若一个Partition分区Leader副本失效其中一个Follower副本将自动变成新的Leader副本

生产者将消息发送到Topic中去,同时负责选择将message发送到Topic的哪个Partition。通过round-robin做简单的负载均衡。也可根据消息中某个关键字来进行区分,通常第二种方式使用更多

对于消费者,传统的消息传递模式队列模式发布订阅模式,且基于这2种模式提供了一种Consumer的抽象概念Consumer Group

  • Queue模式:多个Consumer从服务器中读取数据,消息只会到达一个Consumer所有Consumer都位于同一Consumer Group
  • Publish-Subscribe模式:消息会被广播给所有Consumer所有Consumer都有唯一的Consumer Group

通常一个Topic会有几个Consumer Group,每个Consumer Group都是一个逻辑上的订阅者,每个Consumer Group由多个Consumer 实例组成,从而达到可扩展和容灾的功能。

一个Partition同一时刻在一个Consumer Group中只能有一个Consumer在消费Partition分区类似于RocketMQ中的队列,从而保证消费顺序Consumer Group中的Consumer数不能比一个Topic中Partition的数量多,否则多出来的Consumer消费不到消息。Kafka只在Partition范围内保证消息消费的局部顺序性,不能在同一个Topic中多个Partition中保证总的消费顺序性。

若有在总体上保证消费顺序的需求,则可通过TopicPartition数量设置为1将Consumer Group中的Consumer数量也设置为1,但会影响性能,故Kafka顺序消费很少用。

客户端调用

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

生产者包含一些关键的参数,包括发送消息持久化机制参数ProducerConfig.ACKS_CONFIG,发送失败会重试次数ProducerConfig.RETRIES_CONFIG,重试时间间隔ProducerConfig.RETRY_BACKOFF_MS_CONFIG,以及发送时可指定Partition分区,同步发送异步发送等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/*
发出消息持久化机制参数
(1)acks=0:表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2)acks=1:至少等待leader成功将数据写入本地log,但不用等待所有followe都成功写入。就可继续发送下一条消息。该情况下若follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all:需等待min.insync.replicas,默认为1,推荐配置大于等于2,该参数配置的副本个数都成功写入日志,这种策略会保证
只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但也可能造成消息重复发送,如网络抖动,故需在接收者处做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试时间间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
// 设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 默认值是0,消息必须立即被发送,但这样会影响性能,一般设置10毫秒左右,该消息发送完后会进入本地的一个batch,
// 若10毫秒内该batch满了16kb就随batch一起被发送出去,若10毫秒内batch没满,则也必须把消息发送出去,不能让消息的发送延迟时间太长
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// 把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int msgNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 1; i <= msgNum; i++) {
// 指定发送分区
// ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));
// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, String.valueOf(i), "order " + i);
// 等待消息发送成功的同步阻塞方法
// RecordMetadata metadata = producer.send(producerRecord).get();
// System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();

消费者同样可指定消费的分区,指定消费者组名称,是否自动提交,自动提交时间间隔,心跳时间间隔等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
String TOPIC_NAME = "my-replicated-topic";
String CONSUMER_GROUP_NAME = "testGroup";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
当消费主题的是一个新的消费组,或指定offset的消费方式,offset不存在,则可通过以下两种方式消费消息
- latest(默认) :只消费自己启动之后发送到主题的消息
- earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
*/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// consumer给broker发送心跳的间隔时间,broker接收到心跳若此时有rebalance发生会通过心跳响应将rebalance方案下发给consumer,该时间可以稍微短一点
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// 服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,对应的Partition也会被重新分配给其他consumer,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
// 一次poll最大拉取消息的条数,若消费者处理速度很快,可以设置大点,若处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 若两次poll操作间隔超过该时间,则broker认为该consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
// 消息回溯消费
//consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
// 指定offset消费
//consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
// 从指定时间点开始消费
/*
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费里的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}*/
while (true) {
// poll() API 是拉取消息的长轮询
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
/*if (records.count() > 0) {
// 手动同步提交offset,当前线程会阻塞直到offset提交成功一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});

}*/
}

Spring整合

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack_mode: manual_immediate
1
2
3
4
5
6
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* }, concurrency = "6")
* concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
*/
@KafkaListener(topics = "my-replicated-topic", groupId = "testGroup")
public void listenTestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
//ack.acknowledge();
}
// 配置多个消费组
@KafkaListener(topics = "my-replicated-topic", groupId = "elevenGroup")
public void listenElevenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
ack.acknowledge();
}

设计原理

总控制器Controller

Kafka集群中会有一个或多个Broker,其中有一个Broker会被选举为Kafka Controller控制器,其负责管理整个集群中所有分区和副本的状态

  • 当某个分区的Leader副本出现故障时,由控制器负责为该分区选举新的Leader副本
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有Broker更新其元数据信息
  • 当使用kafka-topics.sh脚本为某个Topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到

Kafka集群启动时自动选举一台Broker作为Controller来管理整个集群,选举过程是集群中每个Broker都会尝试在Zookeeper上创建一个 /controller临时节点,Zookeeper会保证有且仅有一个Broker能创建成功,创建成功的Broker则成为集群的总控器Controller。

Controller选举机制

Controller角色的Broker宕机时Zookeeper临时节点会消失,集群里其他Broker会一直监听/controller临时节点,发现临时节点消失则竞争再次创建/controller临时节点,这就是Controller的选举机制。具备控制器身份的Broker需要比其他普通Broker多一份职责:

  1. 监听Broker相关的变化,为Zookeeper中的/brokers/ids节点添加BrokerChangeListener,用来处理Broker增减变化
  2. 监听Topic相关的变化,为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理Topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除Topic动作
  3. 从Zookeeper中读取当前所有与TopicPartition以及Broker有关信息并进行相应的管理。对所有Topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听Topic中分区分配变化
  4. 更新集群元数据信息,同步到其他普通Broker节点中。

Partition副本选举Leader机制

Controller会监听/brokers/ids节点可感知Broker是否存活,当Controller感知到分区Leader所在Broker挂了,参数unclean.leader.election.enable=false的前提下,Controller会从ISR列表里挑第一个Broker作为Leader,因为第一个Broker最先放进ISR列表可能是同步数据最多的副本,若参数unclean.leader.election.enable=true代表在ISR列表里所有副本都挂了时可在ISR列表以外的副本中选Leader,该设置可提高可用性,但选出的新Leader可能数据少很多。

副本进入ISR列表有两个条件:副本节点不能产生分区,必须能与Zookeeper保持会话以及跟Leader副本网络连通;副本能复制Leader上的所有写操作,且不能落后太多,超过replica.lag.time.max.ms时间都没跟Leader同步过一次的副本会被移出ISR列表;

offset记录机制

每个Consumer会定期将自己消费分区的offset提交给Kafka内部名称为__consumer_offsets的Topic,提交过去时key为consumerGroupId+topic+分区号,value就是当前offset的值,Kafka会定期清理该Topic里的消息保留最新的那条数据,因为__consumer_offsets可能会接收高并发的请求,Kafka默认给其分配50个分区,可通过offsets.topic.num.partitions设置,这样可通过加机器的方式抗大并发。

通过公式hash(consumerGroupId) % __consumer_offsets主题的分区数可选出Consumer消费的offset要提交到__consumer_offsets的哪个分区

消费者Rebalance机制

Rebalance机制:若消费组消费者数量变化消费分区数变化,Kafka会重新分配消费者消费分区的关系。如Consumer Group中某个消费者挂了,此时会自动把分配给它的分区交给其它消费者,若其恢复则又会把一些分区重新交还给它。

Rebalance只针对Subscribe这种不指定分区消费的情况,若通过assign消费方式指定了分区Kafka不会进行Rebanlance。当消费组中Consumer增加或减少动态给Topic增加分区消费组订阅了更多的Topic等情况可能触发消费者Rebalance

Rebalance过程中消费者无法从Kafka消费消息,对Kafka的TPS会有影响,若Kafka集群内节点较多Rebalance重平衡可能会耗时极多,应尽量避免在系统高峰期Rebalance重平衡

消费者Rebalance分区分配策略

消费者Rebalance分区分配策略主要有rangeround-robinsticky三种Rebalance策略默认range分配策略。Kafka提供消费者客户端参数partition.assignment.strategy来设置消费者订阅主题之间的分区分配策略

  • range策略:按分区序号排序,假设n=分区数消费者数量m=分区数%消费者数量,则m个消费者每个分配n+1个分区消费者数量 - m个消费者每个分配n个分区
  • round-robin策略轮询分配
  • sticky策略:初始时分配策略与round-robin类似,但在Rebalance时需要保证分区分配尽可能均匀分区分配尽可能与上次分配保持相同两个原则。当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。

Rebalance过程

Rebalance过程

有消费者加入消费组时消费者消费组组协调器之间会依次经历选择组协调器加入消费组SYNC GROUP三个阶段;

  • 选择组协调器:每个Consumer Group都会选择一个Broker作为自己的组协调器GroupCoordinator,负责监控该消费组中所有消费者心跳,以及判断是否宕机,然后开启消费者Rebalance。Consumer Group中每个Consumer启动时会向Kafka集群中某个节点发送 FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。Consumer消费的offset要提交到__consumer_offsets的哪个分区,该分区Leader对应的Broker就是该Consumer Group的GroupCoordinator。
  • 加入消费组:消费者会向GroupCoordinator发送JoinGroupRequest请求并处理响应。然后GroupCoordinator从一个Consumer Group中选择第一个加入Group的Consumer作为Leader消费组协调器,把Consumer Group情况发送给该Leader,接着该Consumer Leader会负责制定分区方案
  • SYNC GROUPConsumer Leader通过GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator把分区方案下发给各个Consumer,具体的Consumer根据指定分区的Leader Broker进行网络连接以及消息消费

Producer发布消息机制

Producer采用push模式将消息发布到Broker,每条消息都被append到顺序写磁盘Patition中,Producer发送消息到Broker时,会根据分区算法选择将其存储到哪一个Partition,路由机制为:

  • 指定了Patition,则直接使用
  • 未指定Patition但指定了key,通过对key的value进行hash选出一个Patition
  • Patition和key都未指定,使用轮询选出一个Patition。

Producer先从Zookeeper的/brokers/.../state节点找到该Partition的Leader,然后将消息发送给该Leader,Leader将消息写入本地commit log,Followers从Leader Pull消息,写入本地commit log后向Leader发送ACK,Leader收到所有ISR中的Replica的ACK后,增加最后 commit 的offset即high watermark高水位简称HW并向Producer发送ACK。

HW和LEO

High Watermark俗称高水位,取一个Partition对应的ISR中最小的log-end-offsetLEO作为HWConsumer最多只能消费到HW所在的位置。每个Replica都有HWLeaderFollower各自负责更新自己的HW状态。对于Leader新写入的消息Consumer不能立刻消费,Leader会等待该消息被所有ISR中的Replicas同步后更新HW,此时消息才能被Consumer消费。这样保证了若Leader所在Broker失效,该消息仍然可从新选举的Leader中获取。对于来自内部Broker的读取请求没有HW的限制。ISR以及HW和LEO的流转过程:

Kafka复制机制既不是完全的同步复制也不是单纯的异步复制。同步复制要求所有能工作的Follower都复制完,这条消息才会被commit极大影响了吞吐率。异步复制方式下Follower异步从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下若Follower还未复制完落后于Leader时,若Leader宕机则会丢失数据。Kafka使用ISR方式则很好均衡了确保数据不丢失以及吞吐率acks=1的情况:

日志分段存储

Kafka一个分区的消息数据对应存储在一个文件夹下以Topic名称+分区号命名,消息在分区内是分段存储,每个段的消息都存储在不一样的log文件里,这种特性方便过期分段文件快速被删除,Kafka规定一个段位log文件最大1G

Kafka每次往分区发4K消息就会记录一条当前消息的offsetindex文件以及记录一条当前消息的发送时间戳与对应的offsettimeindex文件,若要定位消息的offset先在index文件里快速定位,若需要按照时间来定位消息的offset,会先在timeindex文件里查找,再去log文件里找具体消息,相当于一个稀疏索引

1
2
3
4
5
6
# 部分消息的offset索引文件,Kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件,若要定位消息的offset会先在该文件里快速定位,再去log文件里找具体消息,相当于一个稀疏索引
00000000000000000000.index
# 消息存储文件,主要存offset和消息体
00000000000000000000.log
# 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,若需要按照时间来定位消息的offset,会先在这个文件里查找
00000000000000000000.timeindex

一个日志段文件满了,会自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,该过程叫做log rolling,正在被写入的日志段文件叫做active log segment

实际问题

消息丢失

消息发送端
  • acks=0: 表示Producer不需要等待任何Broker确认收到消息的回复,就可继续发送下一条消息性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可用这种。
  • acks=1至少要等待Leader已经成功将数据写入本地log,但不需要等待所有Follower是否成功写入。就可继续发送下一条消息。该情况下若Follower没有成功备份数据,而此时Leader挂掉则消息会丢失。
  • acks=-1all: Leader需等待所有备份即min.insync.replicas配置的备份个数都成功写入日志,该策略会保证只要有一个备份存活就不会丢失数据。
消息消费端

若消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但此时Consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

消息重复消费

消息发送端:发送消息若配置了重试机制,如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息

消息消费端:若消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理,一般消费端都是要做消费幂等处理。

消息乱序

发送端配置了重试机制,Kafka不会等之前那条消息完全发送成功才去发送下一条消息,可能会出现发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了,是否一定要配置重试要根据业务情况而定。也可用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。

kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但性能比较低,可在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列,一个内存队列开启一个线程顺序处理消息。

消息积压

线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致Broker积压大量未消费消息。若积压了上百万未消费消息需要紧急处理,可修改消费端程序,让其将收到的消息快速转发到其他Topic,可设置很多分区,然后再启动多个消费者同时消费新主题的不同分区。

由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。此种情况可将这些消费不成功的消息转发到其它队列里去,类似死信队列,后面再慢慢分析死信队列里的消息处理问题。

延时队列

延时队列存储的对象是延时消息。指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取该消息进行消费,但Kafka不支持延时队列。

实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中如topic_1s,topic_5s,topic_10s,…topic_2h,一般不能支持任意时间段的延时,然后通过定时器进行轮训消费这些Topic,查看消息是否到期,若到期则把该消息发送到具体业务处理的Topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,若到了就转发,如果还没到这一次定时任务就可以提前结束了。

消息回溯

若某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用Consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费。

消息传递保障

kafka生产者的幂等性,因发送端重试导致的消息重复发送问题,Kafka幂等性可保证重复发送的消息只接收一次,只需在生产者加上参数props.put("enable.idempotence", true)即可,默认是false不开启,Kafka每次发送消息会生成PIDSequence Number并将这两个属性一起发送给Broker,Broker将PID和Sequence Number跟消息绑定一起存起来,若生产者重发相同消息,Broker会检查PID和Sequence Number,若相同不会再接收。

每个新的Producer在初始化时会被分配一个唯一的PID,PID对用户完全是透明的,生产者若重启则会生成新的PID,每个PID该Producer发送到每个Partition的数据都有对应的序列号即Sequence Number,这些序列号是从0开始单调递增的。

Kafka的事务

Kafka事务不同于Rocketmq,Rocketmq是保障本地事务与MQ消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性,一般在Kafka流式计算场景用得多一点,如kafka需要对一个Topic中的消息做不同的流式计算处理,处理完分别发到不同的Topic里,这些Topic分别被不同的下游系统消费如hbase,redis,es等,这种肯定希望系统发送到多个topic的数据保持事务一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions(); //初始化事务
try {

producer.beginTransaction(); //开启事务
for (int i = 0; i < 100; i++) {//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
}

producer.commitTransaction(); //提交事务
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {

producer.abortTransaction(); //回滚事务
}
producer.close();