读书笔记|《深入理解kafka:核心设计与实现》
本篇记录了《深入理解kafka:核心设计与实现》的小概念,算是读书笔记吧,还待不断补充ing
基本知识
- 主题是一个逻辑上的概念,可以细分为多个分区
- 同一主题下的不同分区包含的消息是不同的
- kafka中的分区可以分布在不同的服务器上
- 每条消息被发送到broker前,会根据分区规则选择存储到哪个具体的分区
- 同一分区的不同副本保存的是相同的消息,其中leader副本负责处理读写请求,follower副本只负责与leader副本进行消息同步
- 分区中的所有副本统称为AR,所有与leader副本一致的(包括leader)称为ISR,与leader副本之后过多的成为OSR,所以AR = ISR + OSR
- ISR与HW和LEO有关系 (LEO标识当前日志文件中下一条待写入消息的offset,分区ISR集合的每个副本都有一个ISR,ISR中最小的为HW)
- 消费者只能拉到HW之前的消息
生产者
- 发送消息主要有三种方式:发后即忘(fire-and-forget)、同步(sync)、异步(async)
- kafkaProducer一般发生两种异常:
- 可重试异常:NetworkException、LeaderNotAvaliableException、UnknowTopicOrPartitionException、NotEnoughReplicasException等
- 对于可重试异常,如果配置了retries参数,那么只要在规定的重试次数内自行恢复了就不会抛出异常
- 不可重试异常
- 可重试异常:NetworkException、LeaderNotAvaliableException、UnknowTopicOrPartitionException、NotEnoughReplicasException等
- kafkaProducer —> send() —> intgerceptor() —> serializer() —> partitioner() —> broker
- 如果key不为null,计算得到的分区号会是所有分区中的任意一个(对key进行hash运算,相同hash的入同一分区),如果key为null,那么计算得到的分区号仅为可用分区中的任意一个
- kafkaProducer会在消息被应答之前或消息发送失败时调用拦截器,优先于用户设定的callback之前执行
- 生产者架构图
- 整个生产者由两个线程(主线程和sender线程)执行
- RecordAccumulator 主要用来缓存消息 以便 Sender 线程可以批量发送
- 主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque)中,
在 RecordAccumulator 的内部为每个分区都维护了 一 个双端队列,队列中的内容就是
ProducerBatch,即 Deque - Sender 从 RecordAccumulator 中 获取缓存的消息之后,会进 一 步将原本<分区, Deque<
ProducerBatch>>的保存形式转变成<Node, List< ProducerBatch>的形式,Sender 还 会进一步封装成<Node,Request>的形式 - 请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,它的主要作用是缓存了已经发出去但还没有收到响应的请求,通过配置参数还可 以限制每个连接最多缓存的请求数
消费者
- 消费者可以通过集合/正则的方式订阅多个主题 subscribe()
- 如果通过正则订阅后,有人创建新的符合该正则的主题后,消费者也可以消费到这个新加入的主题
- 消费者可以直接订阅某些主题的特定分区 assign()
- kafka中的消费是基于拉模式。
- poll()涉及到消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容
- 位移提交
- kafka默认的消费位移的提交方式是自动提交,这里的自动提交是定期提交
- 每次真正向服务端发起拉取请求之前都会检查是否可以进行提交,如果可以就会提交上次轮训的位移
- 手动提交可以细分为同步和异步提交,对应kafkaConsumer的commitSync()和commintAsync()
- 指定位移消费seek() 方法只能重置消费者分配到的分区的消费位置
- 再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,为消费组具备高可用性和伸缩性提供保障,不过在再均衡发生期间,消费组内的消费者是无法读取消息的
- 通过再均衡监听器来监听事件,进行位移提交等操作
- kafkaProducer是线程安全的,然后consumer是非线程安全的,在consumer中定义了acquire()方法,通过CAS来判断当前是否只有一个线程在操作
- 拉消息包含两种场景,follower副本做同步以及consumer的接收
分区和副本
- 创建topic时可以通过replica-assignment 来手动指定分区副本的分配方案
- 分区副本分配
- 生产者:为每条消息指定所要发往的分区
- 消费者:为消费者指定可以消费消息的分区
- 集群:在哪个broker上创建哪些分区的副本
- 创建topic
- kafka-topics.sh
- kafkaAdminClient
- 直接zk创建子节点
- 目前kafka只支持增加分区数而不支持减少分区数
- 删除topic
- 通过指定删除
- 直接在zk的删除路径下创建同名节点
- 删除zk节点及log
- 优先副本是指在AR集合列表的第一个副本,理想情况下优先副本就是leader副本
- 分区平衡 通过一定的方式促使优先副本选举为leader副本
- kafka不会将失效的分区副本自动的前一到集群中剩余的可用broker节点上
- 分区重分配一般发生在集群扩容、broker节点失效的场景下对分区进行迁移
- 分区重分配的本质在于数据复制
- 一味的增加分区可能会导致kafka崩溃,原因在于增加一个分区时会对应增加一个文件描述符,文件描述符可能会不够
日志存储
- 一个副本对应一个日志文件夹,文件夹下包含多个LogSegment包括.log、.index、 .timeindex等其他文件
- 向log中增加消息是顺序写入的,只有最后一个LogSegment才能知晓写入操作
- 每个LogSegment中的日志文件(.log结尾)都有对应的两个索引文件(.index和.timeindex),每个LogSegement的命名都是根据基准偏移量(baseOffset,表示当前LogSegment的第一条消息的offset)命名的
- Kafka实现的压缩方式是将多条消息一起进行压缩
- Kafka中的索引文件以稀疏索引的方式构建消息的索引,通过MappedByteBuffer将索引文件映射阿斗内存中,以加快索引的查询速度
- 日志分段文件切分条件
- 大小超过配置的值,默认>1GB
- 最大时间戳与当前时间查超过配置的值,默认 > 7天
- 两个索引文件的大小超过配置的值,默认 > 10MB
- 追加的消息偏移量超过配置的值,默认 > Integer.MAX_VALUE
- 日志清理
- 日志删除(默认)
- 基于时间的保留策略 默认7天
- 基于日志大小的保留策略 默认1GB
- 基于日志起始偏移量的保留策略
- 日志压缩
- 会保留一个key的最新value值
- 如果一条消息的key部位null但value为null,此消息称为墓碑消息,日志清理线程会进行清理并保留一段时间
- 日志删除(默认)
- kafka高可用
- 顺序写入
- 页缓存
- 零拷贝 依赖底层的sendfile()
深入服务端
- kafka自定义了一组基于TCP的二进制协议
- kafka基于时间轮的概念自定义实现了用于延时功能的定时器
- 时间轮是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个 定时任务列表,该列表是一个环形的双向链表,链表的每一项是定时任务项,封装了真正的定时任务
- 当任务的到期时间超过了当前时间轮所表示的时间范畴时,会尝试添加到上层时间轮中
- ack = -1 的情况下,leader副本默认等待30s时间接受follower副本的ack
- 延时操作创建之后会加入延时操作管理器来管理,每个延时操作管理器会配备一个定时器来做超时管理。延时操作需要支持外部事件的处罚,所以需要配备一个监听池来监听每个分区的外部事件
- kafka集群会有多个broker,其中一个broker会被选举为控制器,负责管理整个集群的梭鱼哦分区和副本的状态
- 控制器选举依赖zk,成功竞选控制器的broker会在zk中创建/controller这个临时节点
- 每个broker都会在内存中保存当前控制器的brokerid值,被标示为activeControllerId
- zk中还会有一个持久的节点用来保存控制器发生变更的次数,每个和控制器交互的请求都会携带该值,如果请求的值小于内存中的值则认为请求时失效的。kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性
- 优雅关闭
- 使用脚本关闭可能不奏效,原因是ps命令会限制输出字符数不得超过页大小PAGE_SIZE = 4096
- 所以jps查看kafka的pid,然后使用kill -s TERM PID / kill -15 PID
- kafka服务入口程序中有一个名为kafka-shutdown-hock的关闭钩子
深入客户端
- 消费者分区分配策略
- RangeAssignor 按照消费者总数和分区总数进行整除得到跨度,然后将分区按照跨度进行平均分配
- RoundRobinAssignor 轮询方式分配,如果同一组内消费者订阅的信息不同,分配的会不均匀
- StickyAssignor
- 自定义
- 旧版消费者客户端使用zk上配置监听器来监听消费者组和kafka集群的状态,此时触发再均衡操作时会导致羊群效应和脑裂效应
- 新版使用消费者协调器和组协调器来解决该问题
- 消息中间件的消息传输保障有3个层级
- at most once
- at least once
- Exactly once
- 由于kafka多副本机制,及对于网络等异常的重试机制,因此这里kafka选用的是at least once
- 而对于消费者而言,at most once/at least once 取决于拉取和提交的相对顺序
- 为了实现生产者的幂等性,kafka引入来PID和序列号,每个新的生产者实例在初始化的时候会被分配一个PID,对于每个PID,消息发送到的每一个分区都有对应的序列号
- broker端会在内存中为每一对<PID,分区>维护一个序列号,对于收到的消息只有当消息的序列号 = broker.序列号 + 1时才接受
- 因为序列号是分区上的概念,所以幂等性并不跨越多个分区运作。而事务可以弥补这个缺陷
- 对于典型的流式应用:消费Atopic——do something ——生产Btopic,通过transactionalId实现事务
- 从生产者角度来说,通过事务kafka可以保证跨生产者会话的消息幂等发送(对有相同transactionalId的新生产者实例被创建且工作时,旧的相同transcationalId将不再工作)及跨生产者会话的事务恢复
- 从消费者角度分析,出于以下原因,事务语义较弱
- 采用日志压缩后,由于相同key的消息会覆盖导致某些消息被清理
- 消息被分布在同一分区的多个日志分段,老的分段又被删除
- 消费者可以通过seek()访问任意offset的消息
可靠性探究、
- 当follower副本将leader副本LEO之前的日志全部同步时,认为follower副本追上leader副本
- follower副本更新自己HW的算法是比较当前的LEO与leader副本传送过来的HW的值,取最小
- leader副本会根据follower副本传来的LEO的最小值作为自己的HW
- 以前版本kafka使用的是基于HW的同步机制,但这样会有数据不一致的问题,原因是HW同步有间隙即follower在更新自己的LEO后需要再一轮的请求才会更新自己的HW,follow副本的HW不能比leader副本的HW高
- 新版本引入leader epoch在需要截断数据的时候使用该值作为参考依据而不是以前的HW
- 生产者写入与消费者读取消息都是与leader副本交互的,从而实现一种主写主独的模型
- kafka分区及副本的设计导致每个broker上的读写负载是一样的
- 读写分离的弊端
- 数据不一致
- 延时
- 常规日志同步机制(少数服从多数)的弊端就是如果容忍1个follower的失败需要保证至少3个副本
- kafka采用的是ISR机制
读书笔记|《深入理解kafka:核心设计与实现》
http://yuyangblog.cn/2019/09/15/读书笔记|《深入理解kafka-核心设计与实现》/