0%

kafka02_producer

基本数据结构

ProducerRecord(0.10.2.1)

一个 Producer record封装了一条待发送的消息(或称为记录)。我们以0.10.2.1版本的消息格式进行 producer的说明。
ProducerRecord由5个字段构成,它们分别如下:

  • topic:该消息所属的 topic
  • partition:该消息所属的分区。
  • key:消息key
  • value:消息体
  • timestamp:消息时间戳。

Record Metadata

Kafka服务器端返回给客户端的消息的元数据信息,包含如下内容:

  • offset:消息在分区日志中的位移信息。
  • timestamp:消息时间戳
  • topic/partition:所属 topic的分区。
  • checksum:消息CRC32码。
  • serializedKeysize:序列化后的消息key字节数。
  • serialized valueSize:序列化后的消息 value字节数。

工作流程

kafka的producer是通过异步的方式去发送消息的。

producer主要参数

acks

acks参数用于控制 producer生产消息的持久性。

acks=0: 设置成0表示 producer完全不理睬 leader broker端的处理结果。

acks=all或者-1:表示当发送消息时, leader broker不仅会将消息写入本地日志,同时还会等待ISR中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给producer

acks=1:是0和all折中的方案,也是默认的参数值。 producer发送消息后 leader broker仅将该消息写入本地日志,然后便发送响应结果给 producer,而无须等待ISR中其他副本写入该消息。(leader broker未同步给ISR副本时崩溃,会导致消息丢失)

retries

默认0,不进行重试。Kafka broker在处理写入请求时可能因为瞬时的故障(比如瞬时的 leader选举或者网络抖动)导致消息发送失败。这种故障通常都是可以自行恢复的,可以通过这个参数让producer自动重试发送。

消息重试发送会引入新的问题

重试可能造成消息的重复发送

比如由于瞬时的网络抖动使得 broker端已成功写入消息但没有成功发送响应给 producer,因此 producer会认为消息发送失败,从而开启重试机制。

我们需要在 consumer端保证同一个消息消费的幂等性

0.11.0.0版本开始支持“精确一次”处理语义,从设计避免了类似的问题。(事务消息可以保证producer发送消息的幂等性, 消息只发送一次,要么发送成功,要么发送失败,但是不保证我们消费的幂等性)

重试可能造成消息的乱序
当前 producer会将多个消息发送请求(默认是5个)缓存在内存中,如果由于某种原因发生了消息发送的重试,就可能造成消息流的乱序。

为了避免乱序发生,Java版本 producer提供了max.in.flight.requets per.connection参数,用户将此参数设置成1, producer将确保某一时刻只能发送一个请求。

消息分区机制

Kafka producer发送过程中一个很重要的步骤就是要确定将消息发送到指定的topic的哪个分区中。 具体实现可以看producer的源码。

  • 默认分区策略
    • key不为null,是根据hash算法murmur2就算出key的hash值,然后和分区数进行取模运算。
    • key为null,partitioner会选择轮询的方式来确保消息在 topic的所有分区上均匀分配。
  • 自定义分区策略,producer提供了分区策略以及对应的分区器(partitioner)供用户使用。

解决消息丢失问题

producer端

  • acks=all
  • retries = MAX_INT
  • max.in.flight.requets per.connection=1 防止消息乱序,只能一个一个发
  • 使用带有回调机制的send 会把发送结果回调到callback里面
  • callback中立即显示关闭producer,否则默认情况下producer会将未发送的消息发送出去才关闭,可能会导致消息乱序。

broker端

  • unclean.leader.election.enable = false 关闭unclean leader选举,即不允许非ISR中的副本被选举为 leader,从而避免broker端因日志水位截断而造成的消息丢失。
  • replication.factor >=3 副本因子>=3
  • min.insync.replicas > 1 最少ISR副本确认数>1
  • 确保 replication.factor > min.insync.replicas

幂等性producer

**0.11.0.0版本引入的幂等性producer表示它的发送操作是幂等的。**瞬时的发送错误可能导致producer端出现重试,同一条消息被 producer发送多次,但在 broker端这条消息只会被写入日志一次。对于单个 topic分区而言,这种 producer提供的幂等性消除了各种错误导致的重复消息。

实现原理

发到broker上的每批消息都会被赋予一个序列号(sequence number),用于消息去重。这个序列号会被保存到kafka日志里面,持久化。

kafka会为每一个producer分配一个PID,pid的分配是对用户透明的。

消息被发送到每一个分区,都有对应的序列号值,这个值总是严格单调递增的。

PID、分区号、消息序列号的关系可以视为kv的关系,PID+分区号为key,value为消息序列号。若发送的消息,小于或等于broker端保存的序列号,broker就会拒绝这条消息的写入。

这种设计确保了即使出现重试操作,每条消息也只会被保存在日志中一次。不过,由于每个新的 producer实例都会被分配不同的PID,当前设计只能保证单个 producer实例的EOS语义,而无法实现多个 producer实例一起提供EOS语义。这一点要特别注意。

事务消息

基于消息中间件的事务消息来完成分布式事务。

事务消息可以确保本地执行事务与消息发送是原子的:先发送一条消息到消息中间件,然后执行本地事务,当本地事务成功后再发送提交确认到消息中间件,然后这条消息才能被其他业务消费者所能感知,从而确保原子性。

**Kafka为实现事务要求应用程序必须提供一个唯一的id来标识事务。**这个id被称为事务id,或 TransactionalId,它必须在应用程序所有的会话上是唯一的。值得注意的是, TransactionalId与上面所说的PID是不同的,前者是由用户显式提供的,而后者是 prodcuer自行分配的。

Kafka会保证,只有producer提交了事务以后,事务消息才会对consumer可见

流程

查找事务协调者

生产者会首先发起一个查找事务协调者的请求(FindCoordinatorRequest)。协调者会负责分配一个PID给生产者。类似于消费组的协调者。

Transaction Coordinator 和 Consumer Coordinator一样,运行在集群中的某个broker上面。

获取produce ID

在知道事务协调者后,生产者需要往协调者发送初始化pid请求(initPidRequest)。这个请求分两种情况:

  • 不带transactionID 这种情况下直接生成一个新的produce ID即可,返回给客户端

  • 带transactionID 这种情况下,kafka根据transactionalId获取对应的PID,这个对应关系是保存在事务日志中。这样可以确保相同的TransactionId返回相同的PID,用于恢复或者终止之前未完成的事务。

启动事务

生产者通过调用beginTransaction接口启动事务,此时只是内部的状态记录为事务开始,但是事务协调者认为事务开始只有当生产者开始发送第一条消息才开始。

提交或回滚事务

用户通过调用commitTransaction或abortTranssaction方法提交或回滚事务。

事务消息对consumer的意义

如果以 consumer的角度而言,如前所述,事务的支持要弱一些,原因如下:

  1. 对于 compacted的 topic而言,事务中的消息可能已经被删除了
  2. 事务可能跨多个日志段( log segment),因此若老的日志段被删除,用户将丢失事务中的部分消息。
  3. consumer程序可能使用seek方法定位事务中的任意位置,也可能造成部分消息的丢失。
  4. consumer可能选择不消费事务中的所有消息,即无法保证读取事务的全部消息。

参考书籍

Apache kafka实战
Kafka权威指南

https://zhuanlan.zhihu.com/p/42046847