Kafka通信
Kafka中间所有的通信都是使用的TCP.
至于为什么没有使用http,主要是使用了TCP的一些高级功能,比如:多路复用.还有个原因是http 库的简陋.
生产者
Producer producer = new KafkaProducer(props) ;
producer.send(msg, callback)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接.
KafkaProducer对象的创建会伴随着Tcp的连接,对所有bootstrap.servers 的机器进行连接.因此在生产中,不需要把所有集群的ip都配置上,因此通过1个就可以获取到集群中所有信息.
同时在send的时候,也会触发tcp的连接.tcp的关闭是默认connections.max.idle.ms = 9分钟.
消费者
TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的.再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
1.发起 FindCoordinator 请求。
默认向负载最小的broker发送请求,获取协调者,协调者存储在broker内存中,负责了消费者组的管理和位移的提交.
怎么找到协调者呢?
目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
2.连接协调者
找到协调者所在的broker后,创建连接,协调者将消费者加入消费者组.
3.消费数据
连接分区的leader副本进行数据消费.
消费者的tcp自动关闭是由消费者端参数 connection.max.idle.ms=9分钟控制的.
9分钟内如果没有数据传输,就会自动关闭.
请求的处理流程
采用事件驱动架构

在broker中有个socketServer组件用于dispatcher所有客户端的请求.
acceptor收到分发请求后,将采用轮询的方式,将请求分发到网络线程池中的一个网络线程.

在网络线程池收到请求后,会把所有的请求放到一个共享队列.
然后使用1个io线程池去执行真正的处理,比如:produce请求则将日志写到磁盘,fetch请求,则从磁盘或者pageCache获取数据.
Purgatory炼狱组件,用于缓存延时请求,对于一些不满足条件的请求,会被缓存在炼狱组件.比如:kafka的延时消息,或者ack=all时,需要等待ISR中所有的副本同步.
Last updated
Was this helpful?