Kafka

kafka架构设计

Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,提高消费能力。逻辑上的一个订阅者

Topic:可以理解为一个队列,Topic将消息分类,生产者和消费者面向的同一个Topic,也就是生产者往Topic发,消费者从Topic拿。

Partition:为了实现扩展性,提高并发能力,一个Topic以多个Partition的方式分布到多个Broker上,每个Partition是一个有序的队列。一个Topic的每个Partition都有若干个副本(Replica),一个Leader和若干个Follower。生产者发送数据的对象,以及消费者消费数据的对象,都是Leader。Follower负责实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower还会成为新的Leader。

Offset:消费者消费的位置信息,监控数据消费到什么位置,用于数据恢复

Zookeeper:Kafka集群能够正常工作,依赖于Zookeeper,Zookeeper帮助Kafka存储和管理集群信息,broker的注册(临时节点),注册Topic,重新选举,存储Offset,消费者组中的消费者注册,保证一条消息只被一个消费者组的其中一个消费者消费,消息消费的配置等。

zk的作用

/brokers/ids:临时节点,保存所有broker节点信息,存储broker的物理地址,版本信息,启动时间等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID会被删除

/brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含一个固定的partitions节点,patitions的子节点就是topic的分区,每个分区下保存一个state节点,保存着当前leader分区和ISR的brokerIDstate节点由leader创建,若leader宕机该节点会被删除,直到有新的leader选举产生,重新生成state节点

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系

/consumers/[group_id]/offset/[topic]/[broker_id-partition_id]:分区信息的消费进度Offset

client通过topic找到topic树下的state节点获得leader的brokerID,到broker树中找到broker的物理地址,但是client不会直连zk,而是通过配置的broker获取zk中的消息

消息丢失的场景和解决方案

消息发送时丢失

  1. ack=0,不重试,producer发送信息完,不管效果,如果发送失败就丢失。

  2. ack=1,leader crash,producer发送消息完,只等待lead写入成功就返回,leader crash了,这时如果follower没来得及同步,消息丢失。

  3. unclean.leader.election.enable 配置 true

    允许选举ISR以外的副本为leader,会导致数据丢失,默认为false。producer发送异步消息完,只等待lead写入成功就返回了,leader crash了,这时ISR中没有follower,leader从OSR中选举,因为OSR中本来落后于leader造成消息丢失。

消息发送解决方案

  1. 配置 ack = all / -1(指的是ISR中的所有从节点),tries > 1,unclean.leader.election.enable :false(不允许选举ISR以外的副本为leader)

    producer发送消息完,等待follower同步完再返回,如果异常则重试,但副本数量影响吞吐量

  2. 配置 min.insync.replicas > 1(同步副本数量指的是ISR中的从节点,只有ack=all时才会生效

    副本指定必须确认写操作成功的最小副本数量。如果不满足这个最小值,则生产者将引发一个异常(NotEnoughReplicas,NotEnoughReplicasAfterAppend)。min.insync.replicas 和 ack 持久性保证,确保如果大多数副本没有收到写操作,则生产者引发异常。

  3. 失败地offset单独记录

    producer发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存单独处理。

消费

先commit再处理消息,如果出现异常,但是offset已经提交,消息对于消费者来说丢失了。

broker的刷盘

减小刷盘间隔(往pagecache中刷盘的时间)

Consumer是pull还是push

pull模式

  1. 根据consumer的消费能力进行数据拉取,可以控制速率
  2. 可以批量拉取或单条拉取
  3. 可以设置不同的提交方式,实现不同的传输语义

缺点:如果kafka没有数据,会导致consumer空循环,消耗资源

解决:通过参数设置,consumer拉取数据为空或没达到一定数量时阻塞

push

不会导致consumer循环等待

缺点:速率固定,忽略consumer的消费能力,可能导致拒绝服务或网络拥塞等

Kafka中高性能的原因

kafka不基于内存,而是硬盘存储,因此消息堆积能力更强

顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append操作,partition是有序的,节省了磁盘的寻道时间,同时通过 批量操作 节省写入次数,partition物理上分为多个segment存储,方便删除

传统和零拷贝

buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用mmap(),磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。

kafka不太依赖jvm(存储数据不依赖堆内存),靠OS的pageCache,如果生产者和消费者速率相当,则直接用pageCache交换数据,不需要经过磁盘IO

Rebalance机制

consumer group中的消费者与topic下的partition重新匹配的过程

何时会产生rebalance:

  1. consumer group中的成员个数发生变化
  2. consumer消费超时
  3. group订阅的topic个数发生变化
  4. group订阅的topic的partition数变化

coordinator:通常是partition的leader节点所在的broker,负责监控group中consumer的存活,consumer维持到coordinator的心跳来判断consumer的消费超时

  1. coordinator通过心跳返回通知consumer进行rebalance
  2. consumer请求coordinator加入组,coordinator选举产生leader consumer
  3. leader consumer从coordinator获取获取所有的consumer,发送syncGroup(分配信息)给到coordinator
  4. coordinator通过心跳机制将syncGroup下发给consumer,完成rebalance

topic或者patition数量发生变化coordinator是不知道的,此时leader consumer监控topic的变化,通知coordinator触发rebalance

rebalance问题

如果C1消费者消息超时,触发rebalance,重新分配,该消息被其他消费者消费,此时C1消费完成提交offset导致错误

解决:coordinator每次rebalance,会标记一个Generation给到consumer,每次rebalance该Generation会+1,consumer提交offset时,coordinator会比对Generation,不一致则拒绝

安装配置

待补充

如何保证分布式事务的最终一致性

分布式事务:业务相关的多个操作,保证同时commit或rollback

  1. 生产者要保证100%的消息投递(事务消息机制,服务质量Quality of Service)
  2. 消费者这端保证幂等性(唯一ID+业务自己实现幂等)

   转载规则


《Kafka》 锦泉 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录