Kafka使用场景

分区

为什么要引用分区

为什么所有的消息要使用分区来存储呢?

其最主要的原因是提供负载均衡的能力.不同的分区分布在不同的机器上,这样每台机器都可以独立的执行各自分区的读写能力.

分区策略

决定消息分配到哪个分区的算法.kafka支持轮询分配,随机分配,自定义分配策略,通过指定Key分配.

默认情况下,如果指定了key,就会发送到指定的分区,否则使用轮询算法来尽量避免数据的倾斜.

通常可以指定key来实现数据的顺序消费.

kafka的数据不丢失配置

对于已提交的数据,kafka保证数据的不丢失.

案例 1:生产者程序丢失数据

在生产者发送消息的时候,使用producer.send(msg, callback),而不是producer.send(msg).

这样一旦发送失败,我们可以做出响应的处理.

案例 2:消费者程序丢失数据

关闭消费者的自动提交,先执行业务再去手动提交offset.

案例3:Kafka的数据丢失

使用ack=all,告诉kafka对已提交的定义.设置成all表示所有的follow副本都接收到了消息,表示已提交.

最佳实践

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback).记住,一定要使用带有回调通知的 send 方法.

  2. 设置 acks = all.acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义.如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”.这是最高等级的“已提交”定义.

  3. 设置 retries 为一个较大的值.这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失.

  4. 设置 unclean.leader.election.enable = false.这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader.如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失.故一般都要将该参数设置成 false,即不允许这种情况的发生.

  5. 设置 replication.factor >= 3.这也是 Broker 端的参数.其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余.

  6. 设置 min.insync.replicas > 1.这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”.设置成大于 1 可以提升消息持久性.在实际环境中千万不要使用默认值 1.

  7. 确保 replication.factor > min.insync.replicas.如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了.我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成.推荐设置成 replication.factor = min.insync.replicas + 1.

  8. 确保消息消费完成再提交.Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式,就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的.

Kafka数据幂等和事务

kafka支持的数据可靠性维度:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。

  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。

  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

默认情况下,如果生产者发送消息,由于网络抖动,拿不到回调,因此kafka使用重试,因此kafka是支持至少一次.

如果想设置最多一次,只需要关闭重试机制.

对于精确一次,kafka提供了幂等和事务来支持.

幂等性 Producer

默认情况下,producer不是幂等的,使用enable.idempotence 被设置成 true.开启幂等.其实现原理是对处理过的数据,在broker多保存一份,当收到了重复消息后,自动丢弃.但是其局限性是只保证单个分区的幂等.

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中.这批消息要么全部写入成功,要么全部失败.另外,事务型 Producer 也不惧进程的重启.Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理.

设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true.

  • 设置 Producer 端参数 transctional. id.最好为其设置一个有意义的名字.

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

这样可以保证record1和record2要么同时成功,要么同时失败.

但是即使是写入失败的数据,kafka也会保存到底层日志中,consumer也是可以收到,因此需要在consumer端设置:isolation.level = read_committed(只读取事务提交的数据和非事务的数据,排除了事物失败的数据).

事务的加入支持数据的不重复性.

Kafka拦截器

生产者拦截器

继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口.该接口是 Kafka 提供的,里面有两个核心的方法.

  1. onSend:该方法会在消息发送之前被调用.如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会.

  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用.还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用.值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦.还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降.

消费者拦截器

实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法.

  1. onConsume:该方法在消息返回给 Consumer 程序之前调用.也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你.

  2. onCommit:Consumer 在提交位移之后调用该方法.通常你可以在该方法中做一些记账类的动作,比如打日志等.

消费者位移的提交

自动提交

kafka默认每隔5秒自动进行提交.

相关参数:

enable.auto.commit =true

auto.commit.interval.ms = 5s

手动提交

1.同步提交

同步提交会发生堵塞,但是失败后自动重试.

2.异步提交

不会堵塞,但是没有失败重试.

最佳实践

 try {
            while (true) {
                        ConsumerRecords<String, String> records = 
                                    consumer.poll(Duration.ofSeconds(1));
                        process(records); // 处理消息
                        commitAysnc(); // 使用异步提交规避阻塞
            }
} catch (Exception e) {
            handle(e); // 处理异常
} finally {
            try {
                        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
	} finally {
	     consumer.close();
}
}

当然,kafka也支持分段提交.

CommitFailedException异常怎么处理?

场景1:

消息的处理时间超过了max.poll.interval.ms 参数值.

场景2:

应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序.

顺序消费

在发送消息的时候,producter指定分区。

Last updated

Was this helpful?