broker简介
broker是Apache Kafka最重要的组件,本质上它是一个功能载体(或服务载体),承载了绝大多数的Kaka服务。事实上,大多数的消息队列框架都有 broker或与之类似的角色。一个broker通常是以服务器的形式出现的,对用户而言, broker的主要功能就是持久化消息以及将消息队列中的消息从发送端传输到消费端。
消息设计
使用JAVA类来定义Kafka似乎很简单,但是这种实现的弊端是什么呢?
- 在jvm中,保存对象的开销非常大
- JMM要求对象必须按照8字节对齐,会产生不必要的padding。
- 堆上的数据越来越多,GC的性能下降很快
目前kafka消息格式有三个版本,V0、V1、V2。
V0版本(kafka 0.10.0.0之前)

- CRC校验码:4字节的CRC校验码,用于确保消息在传输过程中不会被恶意篡改。
- magic:单字节的版本号。V0版本 magic=0,V1版本 magic=1,V2版本 magIc=2。
- attribute:单字节属性字段,目前只使用低3位表示消息的压缩类型。
- key长度字段:4字节的消息key长度信息。若未指定key,则给该字段赋值为-1。
- key值:消息key,长度由上面的“key长度字段”值指定。如果“key长度字段”值是1,则无key,消息没有该字段。
- value长度字段:4字节的消息长度。若未指定 value,则给该字段赋值-1。
- value值:消息 value,长度由上面的“ value长度字段”值指定。如果“ value长度字段值是-1,则无 value,消息没有该字段。
V1(kafka 0.10.0.0)
由于v0版本存在两个弊端,无法满足使用场景,kafka在0.10.0.0版本中改进了消息格式。
弊端:
- 没有消息的时间信息。
- 很多流式处理框架都需要消息保存时间信息以便对消息执行时间窗口等聚合操作。

V1与V0区别:
- 加入8字节的时间戳
- V1版本attribute的第4位被用于指定时间戳类型,目前支持两种CREATE_TIME(消息创建时,由producer指定) 和 LOG_APPEND_TIME(消息发到broker,由broker指定)
V2(kafka 0.11.0.0)
V0,V1存在的缺陷
- 空间利用率不高:不管key,value长度,总是用固定4字节保存。
- 只保存最新消息位移: 如果获取第一条消息的位移,只能把日志都解压缩装入内存方向遍历
- 冗余消息及CRC校验:为每条消息执行CRC比较浪费CPU时间片,尤其是时间戳是LOG_APPEND_TIME类型,每条消息到broker的时候都要重新覆盖时间戳并重新计算crc。
- 未保存消息长度


v2主要优化点如下
- 引入batch概念,每个batch里面可以有很多条消息,crc校验的问题得到优化,crc从消息层移除 被放到batch层
- 增加batch层attribute,移除消息层attribute字段,节约空间
- 可变长度的key value
- PID、 producer epoch和序列号等信息都是0.11.0.0版本为了实现幂等性 producer和支持事务而引入的。PID表示一个幂等性 producer的ID值,producer epoch表示某个PID携带的当前版本号, broker使用PID和 epoch来确定当前合法的 producer实例,并以此阻止过期 producer向 broker生产消息。序列号的引入主要是为了实现消息生产的幂等性。 Kafka依靠它来辨别消息是否已成功提交,从而防止出现重复生产消息。
集群管理
kafka是依赖zk实现集群管理的。每当一个broker启动时,它会将自己注册到zk下的一个节点,路径为/brokers/ids/#{broker.id}

ZooKeeper临时节点的生命周期和客户端会话绑定。如果客户端会话失效,该临时节点就会自动被清除掉。Kaka正是利用 ZooKeeper临时节点来管理
broker生命周期的。broker启动时在ZooKeeper中创建对应的临时节点,同时还会创建一个监听器(listener)监听该临时节点的状态;一旦broker启动后,监听器会自动同步整个集群信息到该 broker上;而一旦该 broker崩溃,它与 ZooKeeper的会话就会失效,导致临时节点被删除,监听器被触发,然后处理 broker崩溃的后续事宜。这就是 Kafka管理集群及其成员的主要流程。
zk路径

- /brokers:里面保存了 Kafka集群的所有信息,包括每台 broker的注册信息,集群上所有topic的信息等。
- /controller:保存了 Kafka controller组件(controller负责集群的领导者选举)注册信息,同时也负责 controller的动态选举。
- /admin:保存管理脚本的输出结果,比如删除 topic,对分区进行重分配等操作。
- / isr_change_notification:保存ISR列表发生变化的分区列表。controller会注册一个监听器实时监控该节点下子节点的变更。
- /config:保存了Kaka集群下各种资源的定制化配置信息,比如每个topc可能有自己
专属的一组配置,那么就保存在/ config/topics/#{topic}下。 - /cluster:保存了Kaka集群的简要信息,包括集群的ID信息和集群版本号。
- controller epoch:保存了controller组件的版本号。Kaka使用该版本号来隔离无效的
controller请求。
副本与ISR设计(复制)
一个Kaka分区本质上就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统高可用性。
这些备份在 Kafka中被称为副本( replica)。
Kafka把分区的所有副本均匀地分配到所有 broker上,并从这些副本中挑选一个作为 leader副本对外提供服务,而其他副本被称为 follower副本,只能被动地向 leader副本请求数据,从而保持与 leader副本的同步。
ISR
ISR,就是 Kafka集群动态维护的一组同步副本集合(in- sync replicas)。
- 每个 topic分区都有自己的ISR列表,ISR中的所有副本都与 leader保持同步状态。
- leader副本总是包含在ISR中的,只有ISR中的副本才有资格被选举为 leader.。
- producer写入的一条条Kafka消息只有被ISR中的所有副本都接收到,才被视为“已提交”状态。(具体表现参考 kafka producer中的acks参数)
副本同步

- 起始位移( base offset):表示该副本当前所含第一条消息的offset
- 高水印值( high watermark,HW):副本高水印值。它保存了该副本最新一条已提交
消息的位移。 leader分区的HW值决定了副本中已提交消息的范围,也确定了
consumer能够获取的消息上限,超过HW值的所有消息都被视为“未提交成功的”,
因而 consumer是看不到的。 - 日志末端位移( log end offset,LEO):副本日志中下一条待写入消息的ofet所有
副本都需要维护自己的LEO信息。每当 leader副本接收到 producer端推送的消息,它
会更新自己的LEO(通常是加1)。同样, follower副本向 leader副本请求到数据后也
会增加自己的LEO。ISR中的所有副本都更新了对应的LEO之后, leader副本才会向右移动HW值表明消息写入成功。
ISR设计
0.9.0.0版本之前
090.0版本之前,Kaka提供了一个参数 replica.lag.max. messages,用于控制 follower副本落后 leader副本的消息数。一旦超过这个消息数,则视为该 follower为“不同步”状态,从而需要被Kaka“踢出”ISR。
存在问题:
- replica.lag.max. messages需要靠用户猜测,如果这个值太小了,会导致follower不断地被踢出ISR,然后又重新加入ISR这种情况发生,非常消耗性能。(例如设置为3,然后同一时间producer发了4条消息,follower会被认为和leader不同步。)
- replica.lag.max. messages参数影响所有topic,是一个全局参数
0.9.0.0版本之后
自0.90.0版本之后,Kaka去掉了之前的 replica.lag. max. messages参数,改用统一的参数
replica.lag.time.maxms(默认值是10秒)同时检测由于慢以及进程卡壳而导致的滞后( lagging)-即 follower副本落后 leader副本的时间间隔。
对于“请求速度追不上”的情况,检测机制也发生了变化—如果一个 follower副本落后 leader的时间持续性地超过了这个参数值,那么该 follower副本就是“不同步”的。这样即使出现刚刚提到的producer瞬时峰值流量,只要 follower不是持续性落后,它就不会反复地在ISR中移进、移出。
水印和 leader epoch(复制原理)
每个Kaka副本对象都持有两个重要的属性:日志末端位移( log end offset,下称LEO)
和高水印(HW)。
- LEO:日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值。
- HW:我们已经很熟悉的高水印值。任何一个副本对象的HW值一定不大于其LEO值,
而小于或等于HW值的所有消息被认为是“已提交的”或“已备份的”( replicated)
Kafka设计了两套 follower副本LEO属性:
- 一套LEO值保存在 follower副本所在 broker的缓存上
- 另一套LEO值保存在leader副本所在broker的缓存上 (leader副本所在机
器的缓存上保存了该分区下所有 follower副本的LEO属性值)
为什么要保存两套值呢?
因为Kaka需要利用前者帮助follower副本自身更新HW值,而同时还需要使用后者来确定 leader副本的HW值,即分区HW。
LEO更新机制
- follower副本端的follower副本的LEO,在 follower发送 FETCH请求后, leader将数据返回给 follower,此时 follower开始向底层log写数据,从而自动更新其LEO值。
- leader副本端的 follower副本LEO,一旦leader接收到 follower发送的 FETCH请求,它首先会从自己的log中读取相应的数据,但是在给 follower返回数据之前它先去更新 follower的LEO(即上面所说的第二套LEO值)。
- leader副本更新LEO, leader写log时就会自动更新它自己的LEO值。
HW更新机制
follower副本HW更新机制
一旦 follower向log写完数据,它就会尝试更新HW值。具体算法就是比较当前LEO值与 FETCH响应中 leader的HW值,取两者的小者作为新的HW值。
leader副本HW更新机制
比起 follower副本的HW属性,我们更关心 leader副本HW值的更新,因为它直接影响了分区数据对于 consumer的可见性。在以下4种情况下, leader会尝试更新分区HW值
切记是尝试,有可能因为不满足条件而不做任何更新。
- 副本成为leader副本时
- broker出现崩溃导致副本被踢出ISR
- producer 向leader副本写入消息时
- leader处理follower FETCH请求时
leader是如何更新它的HW值的呢?
前面说过, leader broker上保存了一套 follower副本的LEO以及它自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括 leader自己的LEO),并选择最小的LEO值作为HW值。
这里的满足条件主要是指副本要满足以下两个条件之一。
- 处于ISR中
- 副本LEO落后于 leader leo的时长不大于 replica lag time.max.ms参数值(默认值是
10秒)
分区HW实际上就是ISR中所有副本的LEO的最小值
基于水印备份机制的缺陷
Kafka使用HW值来决定副本备份的进度而HW值的更新通常需要另一轮 FETCH请求才能完成,故这种设计在本质上是存在缺陷的。可能引发的问题如下
备份数据丢失
使用HW值来确定备份进度时其值的更新是在下一轮RPC中完成的。设想下这样的场景:某 follower发送了第二轮的 FETCH请求给 leader,在接收到响应之后,它会首先写入本地日志假设没有数据可写,故 follower leo不会发生变化。之后 follower副本准备更新其HW值。此时故障发生了, leader副本发生崩溃,那么这个时刻就可能造成数据丢失。

备份数据不一致
除数据丢失风险之外,这种设计还有一个潜在的问题,即造成 leader端log和 follower端
log的数据不一致,即数据离散的问题。举一个例子,假设 leader端保存的消息序列是
rl,r2,r3,4,r5…,而 follower端保存的消息序列可能是rl,r3,r4,r5,r6…。这也是非法的场景,因为顾名思义, follower必须追随 leader,完整地备份 leader端的数据。
这种情况的初始状态与数据丢失场景有些许不同之处:A依然是 leader,A的log写入了2
条消息,但B的log只写入了1条消息。分区HW更新到2,但B的HW还是1,同时生产者端的min.insync.replices=1.
这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,因此成为 leader,分区HW=1。假设此时 producer发送了第3条消息给B,于是B的log中 offset=1的消息变成了绿色框表示的消息,同时分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不用理会A)之后A重启回来,需要执行日志截断,但发现此时分区HW=2,而A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常工作。

leader epoch
所谓领导者 epoch( leader epoch),实际上是一对值( epoch,ofie) epoch表示 leader的版
本号,从0开始,当 leader变更过1次时, epoch就会加1,而oet则对应于该 epoch版本的
leader写入第一条消息的位移。
每个 leader broker中会保存这样一个缓存,并定期写入一个检查点文件中。当 leader写底层log时,它会尝试更新整个缓存如果这个 leader首次写消息,则会在缓存中增加一个条
目,否则就不做更新。而每次副本重新成为 leader时会查询这部分缓存,获取对应 leader版本的位移,这就不会发生数据不一致和丢失的情况
参考链接 官网有关这一功能的描述
虽然能解决部分数据丢失的问题,但是还是会有数据丢失的情况出现,比如链接中 m2的消息就被丢弃了。
日志存储设计
kafka的分区即kafka的日志,对于每个日志而言,kafka又进一步细分成日志段文件以及日志索引文件。
日志段文件,即后缀名是Log的文件保存着真实的Kafka记录。每个og文件都包含了一段位移范围的 Kafka记录。Kaka使用该文件第一条记录对应的offset来命名此log文件。因此,每个新创建的 topic分区一定有offset是0的log文件虽然在Kaka内部 offset是用64位来保存的,但目前对于日志段文件而言, Kafka只使用20位数字来标识offset。不过对于实际的线上环境而言,这通常是足够的。
kafka每个日志段文件上限大小,由broker端参数log.message.bytes控制,默认是1GB.
当日志段被填满后,kafka会自动创建一组新的日志段和索引文件,这个过程被称为日志切分.
kafka正在写入的日志段被称为当前激活日志段或当前日志段,当前日志段不受任何kafka后台任务影响,比如定期清理任务,日志压缩任务。
索引文件
Kaka分区日志还包含两个特殊的文件.index和.timeindex,它们都是索引文件,分别被称为位移索引文件和时间戳索引文件。前者可以帮助 broker更快地定位记录所在
的物理文件位置,而后者则是根据给定的时间戳査找对应的位移信息。
broker端参数 log.index.interval.bytes设置了这个间隔到底是多大,默认值是4KB,即 Kafka分区至少写入了4KB数据后才会在索引文件中增加一个索引项。
不论是位移索引文件还是时间戳索引文件,它们中的索引项都按照某种规律进行升序排列。对于位移索引文件而言,它是按照位移顺序保存的;而时间戳索引文件则严格按照时间戳顺序保存。由于有了这种升序规律,Kaka可以利用二分查找( binary search)算法来搜寻目标索引项,从而降低整体时间复杂度到O(lgN)。若没有索引文件,Kaka搜寻记录的方式只能是从每个日志段文件的头部顺序扫描,因此,这种方案的时间复杂度是O(n)。


日志存留
Kaka是会定期清除日志的,而且清除的单位是日志段文件,即删除符合清除策略的日志
段文件和对应的两个索引文件。当前留存策略有如下两种。
- 基于时间的留存策略: Kafka默认会清除7天前的日志段数据(包括索引文件)。Kaka提供了3个broker端参数,其中log.retention.{ hours minutes ms}用于配置清除日志的时间间隔,其中的ms优先级最高, minutes次之, hours优先级最低。
- 基于大小的留存策略:Kaka默认只会为每个log保存log.retention.bytes参数值大小的
字节数。默认值是-1,表示Kaka不会对log进行大小方面的限制。
日志压缩

在内部,Kaka会构造一个哈希表来保存key与最新位移的映射关系。当执行 compaction
时, Cleaner不断拷贝日志段中的数据,只不过它会无视那些key存在于哈希表中但具有较大位移值的消息。

典型使用场景:只关心最后一条消息的场景,例如kafka分区的offset的topic
Controller设计
在一个Kaka集群中,某个 broker会被选举出来承担特殊的角色,即controller。引入controller就是用来管理和协调Kafka集群的。具体来说,就是管理集群中所有分区的状态并执行相应的管理操作。
每个Kafka集群任意时刻都只能有一个controller,当集群启动时,所有 broker都会参与controller的竞选,但最终只能由一个broker胜出。一旦 controller在某个时刻崩溃,集群中剩余的broker会立刻得到通知,然后开启新一轮的 controller选举。新选举出来的 controller将承担起之前controller的所有工作。

controller管理状态
controller维护的状态分为两类:
- 每台 broker上的分区副本(副本状态)
- 每个分区的 leader副本信息(分区状态)
controller 故障转移原理
一个Kafka集群中发生 controller leader选举的场景共有如下4种:
- 关闭 controller所在 broker
- 当前 controller所在 broker宕机或崩溃。
- 手动删除 ZooKeeper的/controller节点。
- 手动向 ZooKeeper的/controller节点写入新的 broker id
这4种操作变更实际上都是/controller节点的内容
controller只需要做一件事情:创建一个监听该目录的监听器。/controller本质上是一个临时节点,节点保存了当前controller所在的 broker id。
集群首次启动时所有 broker都会抢着创建该节点,但ZooKeeper保证了最终只能有一个 broker胜出—胜出的那个 broker即成为 controller。
一旦成为 controller,它会增加 controller的版本号,即更新 /controller epoch节点的值,然
后履行上面所有的这些职责。对于那些没有成为 controller的 broker们而言,它们不会甘心失败,而是继续监听/controller节点的存活情况并随时准备竞选新的 controller
broker请求处理(Reactor模式)

