kafka分区一致性的思考

在学习kafka的过程中,相信都听过一个概念“kafka不能保证消息整体的一致性,只能保证分区内的一致性“。其原因很简单当时看书的时候看到这里就觉得嗯嗯,是这么个道理,然后就接着往下看了。

后来在发送消息的API中,我以为发个消息,参数里就传topic和value就足够了,往指定topic里发value嘛,但是在代码里,发现除了这两个参数外,还有key这个参数,研讨一番后知道如果key不为null,计算得到的分区号会是所有分区中的任意一个(对key进行hash运算,相同hash的入同一分区),如果key为null,那么计算得到的分区号仅为可用分区中的任意一个。kafka的producer分区策略可以阅读DefaultPartitioner#partition方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) { // 未指定key
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); // 得到所有可用分区
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

以及nextValue方法

1
2
3
4
5
6
7
8
9
10
11
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

这段代码大概意思也不难读懂,了解之后得出一个结论,key这个参数就是在发送消息的时候用来指定消息发往哪个分区的

至此,得出了两个结论,但是当时并没有发散去思考问题,其实仔细想想,这两个概念还是有一定关联关系的。下面通过一个demo来分析一下。

后来的某一天,我们接了这样一个消息,这个消息的的内容是mysql的binlog,就是通过canal来解析mysql的binlog并将解析出来的DML和DDL写入kafka。

我当时通过bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic TOPIC --describe命令看了一下这个topic的详情,发现它的分区数量PartitionCount = 1,当时很不理解为什么要把分区数量设为1,这样岂不是对于一个消费者组而言只有一个消费者在消费数据吗?不过由于开发这段功能的“前辈”已经离职了,所以也就不了了之了,一个分区就一个分区呗,我咸吃萝卜淡操着那个心干嘛。

后来,当我重读《深入理解Kafka:核心设计与实践原理》这本书的时候,再看到“kafka不能保证消息整体的一致性,只能保证分区内的一致性”这个概念的时候,突然想到了这个示例,突然间意识到原来当时那个topic的partitionCount = 1 是有其业务意义的,因为消息的是表的DML语句,其对于顺序的要求是很高的,先Insert再Update和先Update再Insert这就有可能产生两种数据,如果这里建立了多个分区,一致性就得不到保障。当想通这个道理的时候不禁觉得前辈到底是前辈,这里确实只能建一个分区。

再后来,当我看到producer的key这个参数的时候,再回想这个示例不禁发现,真的只能建一个分区吗?kafka既然是一个成熟的中间件了,对于这种需要保证消息一致性的解决方案就是只能是建一个分区吗?显然,kafka留下了key这个参数用来解决这个问题,在该业务上,完全可以将同一张表的DML/DDL用一个key来映射,不同的表使用不同的key,这样就完美解决了该问题,其分区也就可以根据情况来创建多个了。类比到其他业务线上比如订单系统中相同订单号的消息使用同一topic来send。想到这里,emmm,“前辈”确实只是个“前辈”。

至此,通过一个样例,将kafka的两个概念串联了起来,不禁觉得这些看起来很easy的概念,如果面试的时候面试官单问任何一个都可以侃侃而谈的概念,如果多思考,多问为什么,还是可以得到很多不一样的心得与体会的。学习的乐趣大体在于此吧。


kafka分区一致性的思考
http://yuyangblog.cn/2019/09/09/kafka分区一致性的思考/
Aŭtoro
于洋
Postigita
September 9, 2019
Lizenta