kafka架构设计
Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,提高消费能力。逻辑上的一个订阅者。
Topic:可以理解为一个队列,Topic将消息分类,生产者和消费者面向的同一个Topic,也就是生产者往Topic发,消费者从Topic拿。
Partition:为了实现扩展性,提高并发能力,一个Topic以多个Partition的方式分布到多个Broker上,每个Partition是一个有序的队列。一个Topic的每个Partition都有若干个副本(Replica),一个Leader和若干个Follower。生产者发送数据的对象,以及消费者消费数据的对象,都是Leader。Follower负责实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower还会成为新的Leader。
Offset:消费者消费的位置信息,监控数据消费到什么位置,用于数据恢复。
Zookeeper:Kafka集群能够正常工作,依赖于Zookeeper,Zookeeper帮助Kafka存储和管理集群信息,broker的注册(临时节点),注册Topic,重新选举,存储Offset,消费者组中的消费者注册,保证一条消息只被一个消费者组的其中一个消费者消费,消息消费的配置等。
zk的作用
/brokers/ids:临时节点,保存所有broker节点信息,存储broker的物理地址,版本信息,启动时间等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID会被删除
/brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含一个固定的partitions节点,patitions的子节点就是topic的分区,每个分区下保存一个state节点,保存着当前leader分区和ISR的brokerID,state节点由leader创建,若leader宕机该节点会被删除,直到有新的leader选举产生,重新生成state节点
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系
/consumers/[group_id]/offset/[topic]/[broker_id-partition_id]:分区信息的消费进度Offset
client通过topic找到topic树下的state节点获得leader的brokerID,到broker树中找到broker的物理地址,但是client不会直连zk,而是通过配置的broker获取zk中的消息
消息丢失的场景和解决方案
消息发送时丢失
ack=0,不重试,producer发送信息完,不管效果,如果发送失败就丢失。
ack=1,leader crash,producer发送消息完,只等待lead写入成功就返回,leader crash了,这时如果follower没来得及同步,消息丢失。
unclean.leader.election.enable 配置 true
允许选举ISR以外的副本为leader,会导致数据丢失,默认为false。producer发送异步消息完,只等待lead写入成功就返回了,leader crash了,这时ISR中没有follower,leader从OSR中选举,因为OSR中本来落后于leader造成消息丢失。
消息发送解决方案
配置 ack = all / -1(指的是ISR中的所有从节点),tries > 1,unclean.leader.election.enable :false(不允许选举ISR以外的副本为leader)
producer发送消息完,等待follower同步完再返回,如果异常则重试,但副本数量影响吞吐量
配置 min.insync.replicas > 1(同步副本数量指的是ISR中的从节点,只有ack=all时才会生效)
副本指定必须确认写操作成功的最小副本数量。如果不满足这个最小值,则生产者将引发一个异常(NotEnoughReplicas,NotEnoughReplicasAfterAppend)。min.insync.replicas 和 ack 持久性保证,确保如果大多数副本没有收到写操作,则生产者引发异常。
失败地offset单独记录
producer发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存单独处理。
消费
先commit再处理消息,如果出现异常,但是offset已经提交,消息对于消费者来说丢失了。
broker的刷盘
减小刷盘间隔(往pagecache中刷盘的时间)
Consumer是pull还是push
pull模式
- 根据consumer的消费能力进行数据拉取,可以控制速率
- 可以批量拉取或单条拉取
- 可以设置不同的提交方式,实现不同的传输语义
缺点:如果kafka没有数据,会导致consumer空循环,消耗资源
解决:通过参数设置,consumer拉取数据为空或没达到一定数量时阻塞
push
不会导致consumer循环等待
缺点:速率固定,忽略consumer的消费能力,可能导致拒绝服务或网络拥塞等
Kafka中高性能的原因
kafka不基于内存,而是硬盘存储,因此消息堆积能力更强
顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append操作,partition是有序的,节省了磁盘的寻道时间,同时通过 批量操作 节省写入次数,partition物理上分为多个segment存储,方便删除
传统和零拷贝:
buf = mmap(diskfd, len);
write(sockfd, buf, len);
应用程序调用mmap()
,磁盘上的数据会通过DMA
被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write()
,操作系统直接将内核缓冲区的内容拷贝到socket
缓冲区中,这一切都发生在内核态,最后,socket
缓冲区再把数据发到网卡去。
kafka不太依赖jvm(存储数据不依赖堆内存),靠OS的pageCache,如果生产者和消费者速率相当,则直接用pageCache交换数据,不需要经过磁盘IO
Rebalance机制
consumer group中的消费者与topic下的partition重新匹配的过程
何时会产生rebalance:
- consumer group中的成员个数发生变化
- consumer消费超时
- group订阅的topic个数发生变化
- group订阅的topic的partition数变化
coordinator:通常是partition的leader节点所在的broker,负责监控group中consumer的存活,consumer维持到coordinator的心跳来判断consumer的消费超时
- coordinator通过心跳返回通知consumer进行rebalance
- consumer请求coordinator加入组,coordinator选举产生leader consumer
- leader consumer从coordinator获取获取所有的consumer,发送syncGroup(分配信息)给到coordinator
- coordinator通过心跳机制将syncGroup下发给consumer,完成rebalance
topic或者patition数量发生变化coordinator是不知道的,此时leader consumer监控topic的变化,通知coordinator触发rebalance
rebalance问题
如果C1消费者消息超时,触发rebalance,重新分配,该消息被其他消费者消费,此时C1消费完成提交offset导致错误
解决:coordinator每次rebalance,会标记一个Generation给到consumer,每次rebalance该Generation会+1,consumer提交offset时,coordinator会比对Generation,不一致则拒绝
安装配置
待补充
如何保证分布式事务的最终一致性
分布式事务:业务相关的多个操作,保证同时commit或rollback
- 生产者要保证100%的消息投递(事务消息机制,服务质量Quality of Service)
- 消费者这端保证幂等性(唯一ID+业务自己实现幂等)