0%

Kafka自整理学习笔记

前言

  1. 本博客整理了一些Kafka的基本概念和使用

  2. 自己用来当笔记的,故不需要图帮助理解

  3. 只有Java端的使用,C++端的以后可能会补上

1. 事件流

事件流的三个特征:数据格式+持续性的流+分发。

  • 实时从事件源读取事件流
  • 数据持久化存储和恢复
  • 实时或者异步处理和响应流数据
  • 将数据流路由到不同的目的地

2. Kafka

Kafka是一个数据流平台,其具有三个能力:

  1. 发布和订阅事件流,可以持续地从其他系统导入和导出data
  2. 持久可靠地存储事件流
  3. 可以同步和异步化处理事件流

以上的功能Kafka都能以分布式、高度可拓展、高容错、高安全的方式提供。

3. Kafka如何工作

Server和Clients通过高性能的TCP网络来进行通讯。

3.1 Servers

​ 可以通过集群进行部署,一个或者多个server都可以运行,用于存储的服务器称为brokers。kafka集群具有高度的可拓展性和容错能力,如何任何服务器发生故障,其他服务器将接管其工作。

3.2 Clients

微服务或者分布式客户端并行地读取、写和处理事件流。

3.3 主要概念和术语

​ 事件(event)可以称之为消息。从kafka读取或者写入数据,数据要遵循一定的格式:概念上,一条消息要包含一个key,一个value、一个时间戳以及可选择的元数据信息。

​ 在kafka中,生产者和消费者之间是独立的。生产者不必等待消费者消费数据,但Kafka会保证同一个消费者不会重复消费同一条数据。

​ 消息会以Topic进行分组和永久存储。简单来说,Topic就像文件系统一个文件夹,消息就像是文件夹中的文件。一个Topic可以有多个生产者和多个消费者,消息被消费后不会删除,用户可以通过配置文件来决定就消息可以保存多久,但是磁盘上的消息数量不会影响kafka的性能。

​ 一个Topic下有多个Partition(分区)。简单解释就是,一个topic里面的消息被拆分了,而且分布在了不同的broker上。这种分布式的结构在一定程度上实现了生产者和消费者同时进行消息的写入与读取。当一条消息被发布到了Topic中,它实际上是被加到了一个partition中。相同key的消息会被写入到同一个partition中,kafka保证任意消费者会严格按照消息的写入顺序来消费消息。为了保证数据的容错率和高可用性,每个Topic都有存有副本(每个partition分布在不同的broker,每个broker会存有其他partition数据的副本)

4. 安装

这里注意一样,kafka3.0号称不依赖zk了,但是我看了一下官网样例,还是写的带zk的部署方式,不依赖zk的只是有范本,还未有稳定版本。

1
2
3
4
5
6
7
8
tar -zxf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0

# 至少需要java8
bin/zookeeper-server-start.sh config/zookeeper.properties

# 在另一台终端开启kafka broker
bin/kafka-server-start.sh config/server.properties

5. 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 创建topic
# 一个partition,一个副本,topic名称test
./kafka-topics.sh --create --zookeeper ip:2182 --replication-factor 1 --partition 1 --topic test

# 查看当前kafka有多少topic
./kafka-topics.sh --list --zookeeper ip:2182

# 发送消息
# kafka自带了一个producer客户端,可与从本地文件或者命令行中传递消息(把内容以消息的形式发送到kafka集群)
# 发送两条消息
./kafka-console-producer.sh --broker-list ip:9092 --topic test
> message1
> message2

# 消费消息
# kafka携带了一个命令行客户端,会将获取到内容在命令行输出,默认消费最新消息
# 方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
# 方式二:从头开始消费
./kafka-console-consumer.sh --bootstarp-server ip:9092 --from-beginning --topic test

5.1 消费者的几个注意点:

  • 消息是持久化存储的
  • 消费是顺序存储的,先进先出
  • 消息有偏移量
  • 消息可以指定偏移量进行消费

6. 顺序消费原理

kafka安装时的配置文件中指定的log.dir是kafka保存消息的路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# log.dir 内部
默认有一个主题:__consumer_offsets,这个主题有多个分区,里面存放着消费者维护的偏移量。
# 每个消费者消费到了某个partition的第几条数据(offset)会存储在这个主题里面,以便于恢复

test主题:
- 00000.index # offset position(段的偏移)
- 00000.log # 消息本体
- 00000.timestamp

# .log文件其实就是分段(segment),分段文件的大小可以在server.properties里面配置,默认1GB
# 查看分段文件
./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000.log --print-data-log
# segment结构如下:
- offset: 相对于该分区的偏移值,可以理解成第几条消息
- position: 相对于当前segment的偏移值
- CreateTime: 记录创建的时间
- isValid: 表示key的长度
- valuesize: 表示value的长度
- magic: 本次发布的kafka服务协议版本号
- compressscodec: 压缩工具
- producedId: 生产者ID(用于幂等)
- sequence: 消息的序列号(用于幂等)
- payload: 表示具体的消息

原理:

  • 生产者将消息发送给broker,broker将消息保存在本地的日志文件中
  • 消息的保存是有时序的,通过offset偏移量来保证消息的有序性
  • 消费者消费消息时也是通过offsets来描述当前要消费的那条消息的位置(offset+1)

7. 单播消息和多播消息

在一个topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否会被两个消费者消费?

7.1 单播消息

1
2
3
4
5
6
7
8
9
10
11
12
# 单播消息
# consumer-1
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test
# consumer-2
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test

> 生产者发送了消息
> message1
> message2

# 发现消费者group中只有一个消费者会受到订阅的test主题的消息,而且这个消费者还不会变(因为test主题只有一个partition)
# 可以理解成这个消费者订阅了这个partition

7.2 多播消息

1
2
3
4
5
6
7
8
9
10
11
# 多播消息
# consumer-1
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup1 --topic test
# consumer-2
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup2 --topic test

> 生产者发送了消息
> message1
> message2

# 两个消费者都收到了消息

7.3 解释

​ 概念就是消费者组,一个消费者组可以看成是一个消费者,一个消费者是不能重复消费消息的,消费者组是为了满足并发消费消息,从而实现高吞吐的设计。一个partition效果不明显,但是当partition多了,一个消费者组中不同的消费者可以通过某个topic的消息效率。

7.4 查看消费组及信息

1
2
3
4
5
6
7
8
9
# 查看当前节点下有多少消费者组
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --list

# 查看消费者组中的具体信息:当前偏移量、最后一条消息的偏移量
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --describe --group testGroup

# 可以看到以下信息
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG(还有多少未被消费的消息) CONSUMER-ID HOST CLIENT-ID
# 如果消费者挂了,以上信息依然会存在,生产者可以继续生产消息,但是CURRENT-OFFSET是不动的,因为消费者没有消费消息

重点关注:

  • CURRENT-OFFSET:最后被消费的消息的偏移量
  • LOG-END-OFFSET:最后一条消息的偏移量
  • LAG:当前消费组未消费的消息数量

8. 主题与分区

8.1 Topic

​ 主题就是Kafka消息的逻辑划分,一个主题下相当与一个类别。Kafka通过Topic将消息进行分类,不同的Topic会被订阅该Topic的消费者消费。

​ 如果一个Topic中的消息巨多无比(可能有几个T),而消息是会被保存到.log文件中的,用一个文件,压力太大。为了解决这个问题,kafka使用了partition来分布式存储这些消息。

8.2 Partition

  • 分区存储,可以解决统一存储文件过大的问题
  • 提高了读写的吞吐量
1
2
3
4
5
6
7
8
9
10
11
12
13
# 主题多分区
./kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 2 --topic test2

# 查看log文件,可以看见如果有多个partition,一个topic其实就会有两个文件夹存在磁盘上
test2-0
- 0000.index
- 0000.log
- 0000.timestamp

test2-1
- 0000.index
- 0000.log
- 0000.timestamp

小细节,大改变:

  • kafka内部默认有50个主题__consumer_offsets,这些主题用存储消费者消费消息的偏移量
  • 消费者会定期把自己消费分区的offset提交给kafka内部的主题__consumer_offsets,提交的时候,key是consumerGroupId+topic+分区号,value是当前分区的offset。(消费者)
  • __conusmer_offsets有多个分区(可以server.properties里面配置,默认50),消费者通过hash(consumerGroupId)%分区数来确定消息被存储到哪个分区
  • kafka会定期消费已经消费过的数据,默认是7天,旧消息会被删除
  • __consumer_offsets里面有多个分区是为了提高并发效率,很多消费者可以同时写入自己的offset

9. 集群与副本

9.1 搭建集群(伪),三个broker

准备3个server.properties,这里演示建在一台机器上的伪集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# server.properties
broker.id = 0
listeners=PLAINEXT://ip:9092
log.dir=/usr/local/data/kafka-logs-0

# server1.properties
broker.id = 1
listeners=PLAINEXT://ip:9093
log.dir=/usr/local/data/kafka-logs-1

# serve2r.properties
broker.id = 2
listeners=PLAINEXT://ip:9094
log.dir=/usr/local/data/kafka-logs-2

# 启动三个kafka服务器
bin/kafka-server-start.sh -deamon config/server.properties
bin/kafka-server-start.sh -deamon config/server1.properties
bin/kafka-server-start.sh -deamon config/server2.properties

# 可以登录zkClient来看一下节点有没有顺利注册(brokers/ids下有三个znode(0, 1, 2))

9.2 副本的概念

​ 副本是为主题中的分区创建多个备份,多个副本在Kafka集群中多个broker中,会有一个副本是Leader,其他是Follower,副本数量一般不超过Broker的数量,不然没有意义。

创建1个主题,2个分区,三个副本

1
2
3
4
./kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 3 --partitions 2 --topic my-replica-topic

# 查看主题
./kafka-topics.sh --describe --zookeeper ip:2181 --topic my-replica-topic

结构:

Topic:my-replica-topic PartitionCount:2 ReplicationFactor:3 Configs:
- Topic:my-replica-topic - Partition: 0 - Leader:2 Replicas: 2,0,1 lsr:2,0,1
- Topic:my-replica-topic - Partition: 2 - Leader:0 Replicas: 0,1,2 lsr:0,1,2
  • Leader负责读写,Follower中的副本会同步Leader中的数据
  • 不同Partition所在的Broker是不一样的
  • 每个Partition的Leader可能都不一样,说明Leader不是Topic的概念。Leader所在Broker负责该Partition的读写,也负责Follower的数据同步
  • lsr:可以同步或者已经同步的节点存放在isr中,如果一个节点同步性能很差,该节点会被T掉
  • Leader挂掉,新Leader会在lsr集合中选举

10. Kafka集群的消息发送和消费

1
2
3
4
5
6
7
8
9
10
11
# 发送
./kafka-console-producer.sh --broker-list ip:9092.ip:9093,ip:9094 --topic my-replica-topic

# 单消费者
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --from-beginning --topic my-replica-topic

# 一个组两个消费者
# 消费者1
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replica-topic
# 消费者2
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replica-topic

注意:

  • 一个Partition最多被一个消费组里的一个消费者消费,因为要保证消息消费的顺序性(想想并发)
  • 一个消费者可以消费多个Partition
  • Kafka只能保证Partition中局部的消息顺序,不能保证Topic中的消息顺序消费
  • 消费组中的消费者的数量一般不会多于Partition的数量,因为会有消费者消费不到消息
  • 如果某个消费者挂了,就会触发rebalance机制,让其他消费者消费其对应的partition

10. Kafka Java生产者实现

10.1 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>与kafka版本一致</version>
</dependency>

public class MySimpleProducer {
private final static String TOPIC_NAME = "memo-kafka-test";

public static void main(String[] args) throws ExecutionException, InterruptedException {

// 1. 设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094");

// 把key和value从字符串序列转成字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 配置同步消息的ack
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但也会造成消息的重复发送,
* 如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 发送消息重试间隔时间,设为300
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

/*
* 设置消息发送的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区(默认32M),
* kafka生产者客户端会启一条本地线程拉16kb的数据发送给kafka,如果消息没达到16kb,该线程
* 10毫秒以后也会将数据发送
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

// 2. 创建生产消息的客户端,传入参数
Producer<String, String> producer = new KafkaProducer<>(props);

// 3. 创建消息
// key的作用是决定往哪个分区上发,value是具体消息内容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mytestKey", "hello kafka2");

// 4. 发送消息,得到消息发送的元数据并输出
RecordMetadata recordMetadata = producer.send(producerRecord).get();
System.out.println("**********************************");
System.out.println("同步方式发送结果:" + "topic-" + recordMetadata.topic()
+ "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset());
producer.close();
}
}

10.2 同步发送和异步发送

同步发送和异步发送都是针对生产者和Kafka服务端的,同步用的比较多,异步的性能提升不明显,反而会出现消息丢失的可能。

10.2.1 同步发送

如果生产者发送消息,没有收到kafka给服务端的ack,生产者会阻塞,阻塞到3S的时间(可以配置),如果还没有收到消息,则会进行重试,重试的次数默认也为3个,还不行,就会报错了。

10.2.2 异步发送

不会确认收到kafka的ack,会触发一个回调函数(不会阻塞)。生产者发送完消息可以做别的事情,broker收到消息后就会触发回调(callback)。

1
2
3
4
5
6
7
8
9
10
producer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
System.err.println("发送消息失败:" + Arrays.toString(e.getStackTrace()));
}
if (recordMetadata != null) {
System.out.println("异步方式发送结果:" + "topic-" + recordMetadata.topic()
+ "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset());
}
});
}

10.3 关于生产者ack配置

ack就是在同步发送中,生产者给kafka发送消息后的,kafka给的一个确认信号。

ack会有三个参数配置:

  • ack=0:kafka不需要任何broker收到消息,会立即返回ack给生产者(最容易丢消息,但是效率最高)。
  • ack=1:leader已经收到消息,并把消息写入到本地的log中,才会返回ack生产者。性能和安全性是最均衡的。
  • ack=-1/all:里面有默认配置min.insync.replicas=2(默认为1,推荐配置大于等于2),leader同步2个副本之后(此时集群中有2个broker已完成数据的接收),才会返回ack给生产者。最安全但性能最差。默认是1,那么和ack=1是一样的,因为leader本身就是1个副本。
1
2
3
4
5
6
7
8
9
// 配置同步消息的ack
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但也会造成消息的重复发送,
* 如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 发送消息重试间隔时间,设为300
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

10.4 关于消息发送的缓冲区

  • kafka默认会创建一个消息缓冲区,默认为32MB(可配置)
1
2
3
4
5
6
/*
* 设置消息发送的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区(默认32M),
* kafka生产者客户端会启一条本地线程拉16kb的数据发送给kafka,如果消息没达到16kb,该线程
* 10毫秒以后也会将数据发送
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka客户端本地线程会去缓冲区一次拉16kb的数据,发送给broker
1
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果线程拉不到16kb的数据,间隔10ms也会发送给broker
1
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

11. Java客户端消费者的实现

11.1 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 依赖也是kafka-clients

public class MySimpleConsumer {

private final static String TOPIC_NAME = "memo-kafka-test";
private final static String CONSUMER_GROUP_NAME = "memoGroup";

public static void main(String[] args) {
// 1. 设置参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094");
// 消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 开启offset自动提交,默认是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置自动提交的时间间隔,默认1000ms
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 一次poll最大拉取消息的条数,可以根据消费的速度来设置,默认500
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 如果两次poll的时间如果超过了30s的时间间隔,kafka会人认为其消费能力过弱,会把其T出消费者组,并把分区分给其他消费者(rebalance)。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

// consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


// 2. 订阅主题列表
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 指定分区消费
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
// 消息回溯消费
consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
// 从partition1的offset=5开始消费
//consumer.seek(new TopicPartition(TOPIC_NAME, 1), 5);

// 从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDateTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo partition : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, partition.partition()), fetchDateTime);
}
/*
// 根据时间找偏移量
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (topicPartition == null || offsetAndTimestamp == null) continue;
long offset = offsetAndTimestamp.offset();
System.out.println("partition-" + topicPartition.partition() + "|offset-" + offset);
// 指定offset开始消息
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
}
*/

while (true) {
/*
* 3. poll() API是拉取消息的长轮询,如果poll在1000ms内没有拉满,就会在1000ms之内一直等待
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 消费
System.out.printf("收到消息: partition=%d, offset=%d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value());
}
/*
// 消费结束后
if (!records.isEmpty()) {
// 同步提交,当前线程会阻塞到offset提交成功
// consumer.commitAsync();

// 异步提交
consumer.commitAsync((map, e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
*/

}
}
}

11.2 消费者的自动提交和手动提交

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的__consuemr_offset主题里面。

详细点来:消费者会把offset提交到borker-0上的__consumer_offset主题上,这个broker-0是一个controller。消费者要根据offset进行消费,消费者是先把消息给poll下来,然后不管有没有消费,就会定时把当前偏移量给提交给offset主题(自动提交);手动提交就是把消息消费时或者消费完再手动提交offset。

11.2.1 自动提交

消费者poll到消息后默认情况下,会自动向broker的__consumer_offset主题提交当前主题-分区消费的偏移量。

1
2
3
4
// 开启offset自动提交,默认是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置自动提交的时间间隔,默认1000ms
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意:自动提交可能会丢消息,当消费者刚提交offset之后,没来得及消费就挂了,下一个消费者会从已经提交的offset的下一个位置开始消费消息。未被消费的消息就丢失掉了。

11.2.2 手动提交

  • 设置手动提交参数
1
2
// 开启offset手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  • 手动同步提交

会等待broker-0返回ack

1
2
3
4
5
// 消费结束后
if (!records.isEmpty()) {
// 同步提交,当前线程会阻塞到offset提交成功
consumer.commitAsync();
}
  • 手动异步提交
1
2
3
4
5
6
7
8
9
10
// 消费结束后
if (!records.isEmpty()) {
// 异步提交
consumer.commitAsync((map, e) -> {
if (e != null) {
// 异步提交失败处理逻辑
e.printStackTrace();
}
});
}

11.3 长轮询消息

  • 默认情况下,消费者一次会poll500条消息
1
2
// 一次poll最大拉取消息的条数,可以根据消费的速度来设置,默认500
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代码中设置了长轮询的时间是1000ms
1
2
3
4
/*
* 3. poll() API是拉取消息的长轮询,如果poll在1000ms内没有拉到数据,则返回空
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

意味着:

    • 如果一次poll到了数据,最大为500条,就会直接执行for循环
    • 如果1000ms内没poll到数据,则返回空
  • 如果两次poll的时间间隔太长(默认30s),消费者会被kafkaT出消费者组

1
2
// 如果两次poll的时间如果超过了30s的时间间隔,kafka会人认为其消费能力过弱,会把其T出消费者组,并把分区分给其他消费者(rebalance)。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

12. 消费者的其他配置

12.1 健康状态检查

消费者每隔1s像kafka集群发送心跳,集群发现如果有10s没有续约的消费者,将被T出消费组,触发该消费组的rebalance。

1
2
3
4
// consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

12.2 指定分区消费

1
2
// 指定分区消费
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));

12.3 消息回溯消费

每次消费都从offset=0开始消费,需要先指定分区,再seek

1
2
3
4
// 指定分区消费
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
// 消息回溯消费
onsumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));

12.4 指定offset消费

1
2
3
4
5
6
// 指定分区消费
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
// 消息回溯消费
// consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
// 从partition1的offset=5开始消费
consumer.seek(new TopicPartition(TOPIC_NAME, 1), 5);

12.5 从指定时间点开始消费

1
2
3
4
5
6
7
8
9
10
11
12
// 根据时间找偏移量
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (topicPartition == null || offsetAndTimestamp == null) continue;
long offset = offsetAndTimestamp.offset();
System.out.println("partition-" + topicPartition.partition() + "|offset-" + offset);
// 指定offset开始消息
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
}

12.6 新消费者组的offset

之前命令行的时候搞过的就是这个--from-beginning还记得吗。

默认的消费是最后的offset+1。知道这个就可以理解这个配置了。

1
2
3
4
5
/*
* latest(默认):新消费者组从最后offset+1开始消费
* earliest:第一次从头,然后跟着offset,注意和seekBeginning区分
*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

13. SpringBoot集成Kafka

13.1 基本配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

// 配置
spring:
kafka:
bootstrap-servers: 192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094
producer:
retries: 3
batch-size: 16384 # 16k
buffer-memory: 33554432 # 32M
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# TIME: 当一批poll()的数据被消费者Listener处理后,距离上次提交时间大于TIME时提交offset
# COUNT: 当一批poll()的数据被消费者Listener处理后,处理record数量大于等于COUNT时提交offset
# COUNT_TIME: 满足TIME或者COUNT
# MANUAL: 当一批poll()的数据被消费者Listener处理后,手动调用acknowledge()提交offset
# BATCH:当一批poll()数据被消费者Listener处理后,自动提交offset
# record:当每一条记录被处理后,自动提交offset
ack-mode: MANUAL_IMMEDIATE # 手动调用acknowledge()后(处理一条消息后),提交offset

13.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/msg")
public class MyKafkaController {

private final static String TOPIC_NAME = "memo-kafka-test";

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("/send")
public String sendMessage() {
kafkaTemplate.send(TOPIC_NAME, "KEY", "this kafka sb test msg!");

return "success";
}
}

13.3 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MyConsumer {
/**
* 这里是一条条处理的,一次poll的消息有很多,这里一条条消费啦,thanks for springboot
* @param record:收到的消息
* @param ack: 针对手动提交很作用
*/
@KafkaListener(topics = "memo-kafka-test", groupId = "memoGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}
}

13.4 消费者的详细配置

13.4.1 设置多主题,指定分区,指定offset,同组下的消费者个数(并发消费数)

1
2
3
4
5
6
7
8
9
10
11
12
   // 1个消费者,消费"memo-kafka-test"两个分区,"test"一个分区(并从offset=1开始消费) 
@KafkaListener(groupId = "memoGroup2", topicPartitions = {
@TopicPartition(topic = "memo-kafka-test", partitions = {"0", "1"}),
@TopicPartition(topic = "test", partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "1"))
}, concurrency = "1")// concurrency 就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}

13.4.2 一些有关Listener的配置

1
2
3
4
5
6
7
8
listener:
# TIME: 当一批poll()的数据被消费者Listener处理后,距离上次提交时间大于TIME时提交offset
# COUNT: 当一批poll()的数据被消费者Listener处理后,处理record数量大于等于COUNT时提交offset
# COUNT_TIME: 满足TIME或者COUNT
# MANUAL: 当一批poll()的数据被消费者Listener处理后,手动调用acknowledge()提交offset
# BATCH:当一批poll()数据被消费者Listener处理后,自动提交offset
# record:当每一条记录被处理后,自动提交offset
ack-mode: MANUAL_IMMEDIATE # 手动调用acknowledge()后(处理一条消息后),提交offset

14. Kafka集群Controller、Rebalance、HW和LEO

14.1 Controller

kafka启动的时候会先向zookeeper创建一个临时序号节点,获得的序号最小的那个broker作为集群的controller,负责管理整个集群中的所有分区和副本的状态:

  • 当某个分区的leader出现故障时,由controller负责为该分区选举新的leader副本
  • 当检测到某个分区的ISR集合发生变化时(broker新增或减少),由controller负责通知所有broker更新其元数据信息
  • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由controller通知其他broker

14.2 Rebalance

前提时:消费者没有指明分区消费。当消费组里的消费者和分区关系发生变化,那么就会触发rebalance机制。

这个机制会重新调整消费者消费哪个分区。

在触发rebalance机制之前,消费者消费哪个分区有三种策略:

  • range:通过公式来计算某个消费者消费哪个分区
  • 轮询:大家轮着消费
  • sticky:在触发了rebalance之后,在消费者消费的原分区不变的基础上进行调整