前言
本博客整理了一些Kafka的基本概念和使用
自己用来当笔记的,故不需要图帮助理解
- 只有Java端的使用,C++端的以后可能会补上
1. 事件流
事件流的三个特征:数据格式+持续性的流+分发。
- 实时从事件源读取事件流
- 数据持久化存储和恢复
- 实时或者异步处理和响应流数据
- 将数据流路由到不同的目的地
2. Kafka
Kafka是一个数据流平台,其具有三个能力:
- 发布和订阅事件流,可以持续地从其他系统导入和导出data
- 持久可靠地存储事件流
- 可以同步和异步化处理事件流
以上的功能Kafka都能以分布式、高度可拓展、高容错、高安全的方式提供。
3. Kafka如何工作
Server和Clients通过高性能的TCP网络来进行通讯。
3.1 Servers
可以通过集群进行部署,一个或者多个server都可以运行,用于存储的服务器称为brokers。kafka集群具有高度的可拓展性和容错能力,如何任何服务器发生故障,其他服务器将接管其工作。
3.2 Clients
微服务或者分布式客户端并行地读取、写和处理事件流。
3.3 主要概念和术语
事件(event)可以称之为消息。从kafka读取或者写入数据,数据要遵循一定的格式:概念上,一条消息要包含一个key,一个value、一个时间戳以及可选择的元数据信息。
在kafka中,生产者和消费者之间是独立的。生产者不必等待消费者消费数据,但Kafka会保证同一个消费者不会重复消费同一条数据。
消息会以Topic进行分组和永久存储。简单来说,Topic就像文件系统一个文件夹,消息就像是文件夹中的文件。一个Topic可以有多个生产者和多个消费者,消息被消费后不会删除,用户可以通过配置文件来决定就消息可以保存多久,但是磁盘上的消息数量不会影响kafka的性能。
一个Topic下有多个Partition(分区)。简单解释就是,一个topic里面的消息被拆分了,而且分布在了不同的broker上。这种分布式的结构在一定程度上实现了生产者和消费者同时进行消息的写入与读取。当一条消息被发布到了Topic中,它实际上是被加到了一个partition中。相同key的消息会被写入到同一个partition中,kafka保证任意消费者会严格按照消息的写入顺序来消费消息。为了保证数据的容错率和高可用性,每个Topic都有存有副本(每个partition分布在不同的broker,每个broker会存有其他partition数据的副本)
4. 安装
这里注意一样,kafka3.0号称不依赖zk了,但是我看了一下官网样例,还是写的带zk的部署方式,不依赖zk的只是有范本,还未有稳定版本。
1 2 3 4 5 6 7 8
| tar -zxf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
|
5. 使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
./kafka-topics.sh --create --zookeeper ip:2182 --replication-factor 1 --partition 1 --topic test
./kafka-topics.sh --list --zookeeper ip:2182
./kafka-console-producer.sh --broker-list ip:9092 --topic test > message1 > message2
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
./kafka-console-consumer.sh --bootstarp-server ip:9092 --from-beginning --topic test
|
5.1 消费者的几个注意点:
- 消息是持久化存储的
- 消费是顺序存储的,先进先出
- 消息有偏移量
- 消息可以指定偏移量进行消费
6. 顺序消费原理
kafka安装时的配置文件中指定的log.dir
是kafka保存消息的路径。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| 默认有一个主题:__consumer_offsets,这个主题有多个分区,里面存放着消费者维护的偏移量。
test主题: - 00000.index - 00000.log - 00000.timestamp
./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000.log --print-data-log
- offset: 相对于该分区的偏移值,可以理解成第几条消息 - position: 相对于当前segment的偏移值 - CreateTime: 记录创建的时间 - isValid: 表示key的长度 - valuesize: 表示value的长度 - magic: 本次发布的kafka服务协议版本号 - compressscodec: 压缩工具 - producedId: 生产者ID(用于幂等) - sequence: 消息的序列号(用于幂等) - payload: 表示具体的消息
|
原理:
- 生产者将消息发送给broker,broker将消息保存在本地的日志文件中
- 消息的保存是有时序的,通过offset偏移量来保证消息的有序性
- 消费者消费消息时也是通过offsets来描述当前要消费的那条消息的位置(offset+1)
7. 单播消息和多播消息
在一个topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否会被两个消费者消费?
7.1 单播消息
1 2 3 4 5 6 7 8 9 10 11 12
|
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test
> 生产者发送了消息 > message1 > message2
|
7.2 多播消息
1 2 3 4 5 6 7 8 9 10 11
|
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup2 --topic test
> 生产者发送了消息 > message1 > message2
|
7.3 解释
概念就是消费者组,一个消费者组可以看成是一个消费者,一个消费者是不能重复消费消息的,消费者组是为了满足并发消费消息,从而实现高吞吐的设计。一个partition效果不明显,但是当partition多了,一个消费者组中不同的消费者可以通过某个topic的消息效率。
7.4 查看消费组及信息
1 2 3 4 5 6 7 8 9
| ./kafka-consumer-groups.sh --bootstrap-server ip:9092 --list
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --describe --group testGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG(还有多少未被消费的消息) CONSUMER-ID HOST CLIENT-ID
|
重点关注:
- CURRENT-OFFSET:最后被消费的消息的偏移量
- LOG-END-OFFSET:最后一条消息的偏移量
- LAG:当前消费组未消费的消息数量
8. 主题与分区
8.1 Topic
主题就是Kafka消息的逻辑划分,一个主题下相当与一个类别。Kafka通过Topic将消息进行分类,不同的Topic会被订阅该Topic的消费者消费。
如果一个Topic中的消息巨多无比(可能有几个T),而消息是会被保存到.log
文件中的,用一个文件,压力太大。为了解决这个问题,kafka使用了partition来分布式存储这些消息。
8.2 Partition
- 分区存储,可以解决统一存储文件过大的问题
- 提高了读写的吞吐量
1 2 3 4 5 6 7 8 9 10 11 12 13
| ./kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 2 --topic test2
test2-0 - 0000.index - 0000.log - 0000.timestamp test2-1 - 0000.index - 0000.log - 0000.timestamp
|
小细节,大改变:
- kafka内部默认有50个主题__consumer_offsets,这些主题用存储消费者消费消息的偏移量
- 消费者会定期把自己消费分区的offset提交给kafka内部的主题__consumer_offsets,提交的时候,key是
consumerGroupId+topic+分区号
,value是当前分区的offset。(消费者)
- __conusmer_offsets有多个分区(可以server.properties里面配置,默认50),消费者通过
hash(consumerGroupId)%分区数
来确定消息被存储到哪个分区
- kafka会定期消费已经消费过的数据,默认是7天,旧消息会被删除
- __consumer_offsets里面有多个分区是为了提高并发效率,很多消费者可以同时写入自己的offset
9. 集群与副本
9.1 搭建集群(伪),三个broker
准备3个server.properties,这里演示建在一台机器上的伪集群
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| broker.id = 0 listeners=PLAINEXT://ip:9092 log.dir=/usr/local/data/kafka-logs-0
broker.id = 1 listeners=PLAINEXT://ip:9093 log.dir=/usr/local/data/kafka-logs-1
broker.id = 2 listeners=PLAINEXT://ip:9094 log.dir=/usr/local/data/kafka-logs-2
bin/kafka-server-start.sh -deamon config/server.properties bin/kafka-server-start.sh -deamon config/server1.properties bin/kafka-server-start.sh -deamon config/server2.properties
|
9.2 副本的概念
副本是为主题中的分区创建多个备份,多个副本在Kafka集群中多个broker中,会有一个副本是Leader,其他是Follower,副本数量一般不超过Broker的数量,不然没有意义。
创建1个主题,2个分区,三个副本
1 2 3 4
| ./kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 3 --partitions 2 --topic my-replica-topic
./kafka-topics.sh --describe --zookeeper ip:2181 --topic my-replica-topic
|
结构:
Topic:my-replica-topic |
PartitionCount:2 |
ReplicationFactor:3 |
Configs: |
|
- Topic:my-replica-topic |
- Partition: 0 |
- Leader:2 |
Replicas: 2,0,1 |
lsr:2,0,1 |
- Topic:my-replica-topic |
- Partition: 2 |
- Leader:0 |
Replicas: 0,1,2 |
lsr:0,1,2 |
- Leader负责读写,Follower中的副本会同步Leader中的数据
- 不同Partition所在的Broker是不一样的
- 每个Partition的Leader可能都不一样,说明Leader不是Topic的概念。Leader所在Broker负责该Partition的读写,也负责Follower的数据同步
- lsr:可以同步或者已经同步的节点存放在isr中,如果一个节点同步性能很差,该节点会被T掉
- Leader挂掉,新Leader会在lsr集合中选举
10. Kafka集群的消息发送和消费
1 2 3 4 5 6 7 8 9 10 11
| ./kafka-console-producer.sh --broker-list ip:9092.ip:9093,ip:9094 --topic my-replica-topic
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --from-beginning --topic my-replica-topic
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replica-topic
./kafka-console-consumer.sh --bootstrap-list ip:9092.ip:9093,ip:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replica-topic
|
注意:
- 一个Partition最多被一个消费组里的一个消费者消费,因为要保证消息消费的顺序性(想想并发)
- 一个消费者可以消费多个Partition
- Kafka只能保证Partition中局部的消息顺序,不能保证Topic中的消息顺序消费
- 消费组中的消费者的数量一般不会多于Partition的数量,因为会有消费者消费不到消息
- 如果某个消费者挂了,就会触发rebalance机制,让其他消费者消费其对应的partition
10. Kafka Java生产者实现
10.1 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>与kafka版本一致</version> </dependency> public class MySimpleProducer { private final static String TOPIC_NAME = "memo-kafka-test";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mytestKey", "hello kafka2");
RecordMetadata recordMetadata = producer.send(producerRecord).get(); System.out.println("**********************************"); System.out.println("同步方式发送结果:" + "topic-" + recordMetadata.topic() + "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset()); producer.close(); } }
|
10.2 同步发送和异步发送
同步发送和异步发送都是针对生产者和Kafka服务端的,同步用的比较多,异步的性能提升不明显,反而会出现消息丢失的可能。
10.2.1 同步发送
如果生产者发送消息,没有收到kafka给服务端的ack
,生产者会阻塞,阻塞到3S的时间(可以配置),如果还没有收到消息,则会进行重试,重试的次数默认也为3个,还不行,就会报错了。
10.2.2 异步发送
不会确认收到kafka的ack
,会触发一个回调函数(不会阻塞)。生产者发送完消息可以做别的事情,broker收到消息后就会触发回调(callback)。
1 2 3 4 5 6 7 8 9 10
| producer.send(producerRecord, (recordMetadata, e) -> { if (e != null) { System.err.println("发送消息失败:" + Arrays.toString(e.getStackTrace())); } if (recordMetadata != null) { System.out.println("异步方式发送结果:" + "topic-" + recordMetadata.topic() + "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset()); } }); }
|
10.3 关于生产者ack配置
ack
就是在同步发送中,生产者给kafka发送消息后的,kafka给的一个确认信号。
ack
会有三个参数配置:
- ack=0:kafka不需要任何broker收到消息,会立即返回ack给生产者(最容易丢消息,但是效率最高)。
- ack=1:leader已经收到消息,并把消息写入到本地的log中,才会返回ack生产者。性能和安全性是最均衡的。
- ack=-1/all:里面有默认配置
min.insync.replicas=2(默认为1,推荐配置大于等于2)
,leader同步2个副本之后(此时集群中有2个broker已完成数据的接收),才会返回ack给生产者。最安全但性能最差。默认是1,那么和ack=1是一样的,因为leader本身就是1个副本。
1 2 3 4 5 6 7 8 9
| props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
|
10.4 关于消息发送的缓冲区
- kafka默认会创建一个消息缓冲区,默认为32MB(可配置)
1 2 3 4 5 6
|
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
- kafka客户端本地线程会去缓冲区一次拉16kb的数据,发送给broker
1
| props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
|
- 如果线程拉不到16kb的数据,间隔10ms也会发送给broker
1
| props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
|
11. Java客户端消费者的实现
11.1 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
public class MySimpleConsumer {
private final static String TOPIC_NAME = "memo-kafka-test"; private final static String CONSUMER_GROUP_NAME = "memoGroup";
public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094"); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1))); consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); long fetchDateTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo partition : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, partition.partition()), fetchDateTime); }
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息: partition=%d, offset=%d, key=%s, value=%s%n", record.partition(), record.offset(), record.key(), record.value()); }
} } }
|
11.2 消费者的自动提交和手动提交
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的__consuemr_offset主题里面。
详细点来:消费者会把offset提交到borker-0上的__consumer_offset主题上,这个broker-0是一个controller。消费者要根据offset进行消费,消费者是先把消息给poll下来,然后不管有没有消费,就会定时把当前偏移量给提交给offset主题(自动提交);手动提交就是把消息消费时或者消费完再手动提交offset。
11.2.1 自动提交
消费者poll到消息后默认情况下,会自动向broker的__consumer_offset主题提交当前主题-分区消费的偏移量。
1 2 3 4
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
注意:自动提交可能会丢消息,当消费者刚提交offset之后,没来得及消费就挂了,下一个消费者会从已经提交的offset的下一个位置开始消费消息。未被消费的消息就丢失掉了。
11.2.2 手动提交
1 2
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
会等待broker-0返回ack
1 2 3 4 5
| if (!records.isEmpty()) { consumer.commitAsync(); }
|
1 2 3 4 5 6 7 8 9 10
| if (!records.isEmpty()) { consumer.commitAsync((map, e) -> { if (e != null) { e.printStackTrace(); } }); }
|
11.3 长轮询消息
1 2
| props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
|
1 2 3 4
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
|
意味着:
1 2
| props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
|
12. 消费者的其他配置
12.1 健康状态检查
消费者每隔1s像kafka集群发送心跳,集群发现如果有10s没有续约的消费者,将被T出消费组,触发该消费组的rebalance。
1 2 3 4
| props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
|
12.2 指定分区消费
1 2
| consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
|
12.3 消息回溯消费
每次消费都从offset=0开始消费,需要先指定分区,再seek
1 2 3 4
| consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
onsumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
|
12.4 指定offset消费
1 2 3 4 5 6
| consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
consumer.seek(new TopicPartition(TOPIC_NAME, 1), 5);
|
12.5 从指定时间点开始消费
1 2 3 4 5 6 7 8 9 10 11 12
| Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition topicPartition = entry.getKey(); OffsetAndTimestamp offsetAndTimestamp = entry.getValue(); if (topicPartition == null || offsetAndTimestamp == null) continue; long offset = offsetAndTimestamp.offset(); System.out.println("partition-" + topicPartition.partition() + "|offset-" + offset); consumer.assign(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, offset); }
|
12.6 新消费者组的offset
之前命令行的时候搞过的就是这个--from-beginning
还记得吗。
默认的消费是最后的offset+1
。知道这个就可以理解这个配置了。
1 2 3 4 5
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
13. SpringBoot集成Kafka
13.1 基本配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| // 依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> // 配置 spring: kafka: bootstrap-servers: 192.168.1.249:9092,192.168.1.249:9093,192.168.1.249:9094 producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: ack-mode: MANUAL_IMMEDIATE
|
13.2 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @RestController @RequestMapping("/msg") public class MyKafkaController {
private final static String TOPIC_NAME = "memo-kafka-test";
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send") public String sendMessage() { kafkaTemplate.send(TOPIC_NAME, "KEY", "this kafka sb test msg!");
return "success"; } }
|
13.3 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component public class MyConsumer {
@KafkaListener(topics = "memo-kafka-test", groupId = "memoGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); ack.acknowledge(); } }
|
13.4 消费者的详细配置
13.4.1 设置多主题,指定分区,指定offset,同组下的消费者个数(并发消费数)
1 2 3 4 5 6 7 8 9 10 11 12
| @KafkaListener(groupId = "memoGroup2", topicPartitions = { @TopicPartition(topic = "memo-kafka-test", partitions = {"0", "1"}), @TopicPartition(topic = "test", partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "1")) }, concurrency = "1") public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); ack.acknowledge(); }
|
13.4.2 一些有关Listener的配置
1 2 3 4 5 6 7 8
| listener: ack-mode: MANUAL_IMMEDIATE
|
14. Kafka集群Controller、Rebalance、HW和LEO
14.1 Controller
kafka启动的时候会先向zookeeper创建一个临时序号节点,获得的序号最小的那个broker作为集群的controller,负责管理整个集群中的所有分区和副本的状态:
- 当某个分区的leader出现故障时,由controller负责为该分区选举新的leader副本
- 当检测到某个分区的ISR集合发生变化时(broker新增或减少),由controller负责通知所有broker更新其元数据信息
- 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由controller通知其他broker
14.2 Rebalance
前提时:消费者没有指明分区消费。当消费组里的消费者和分区关系发生变化,那么就会触发rebalance机制。
这个机制会重新调整消费者消费哪个分区。
在触发rebalance机制之前,消费者消费哪个分区有三种策略:
- range:通过公式来计算某个消费者消费哪个分区
- 轮询:大家轮着消费
- sticky:在触发了rebalance之后,在消费者消费的原分区不变的基础上进行调整