上一篇文章我们介绍了Kafka的基本原理,这一篇文章我们继续介绍kafka中如何保存消费端的消费位置。
前面在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic至少有一个分 区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺 序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的;对于应用层的消费来 说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。那么offset保存在哪里?
在kafka中,提供了一个consumer_offsets_* 的一个topic,把offset信息写入到这个topic中。
consumer_offsets——保存了每个consumer group某一时刻提交的offset信息。
__consumer_offsets 默认有50个分区。
根据前面我们演示的案例,我们设置了一个KafkaConsumerDemo的groupid。首先我们需要找到这个 consumer_group保存在哪个分区中/
计算公式
Math/abs(“groupid”/hashCode())%groupMetadataTopicPartitionCount / 由于默认情况下 groupMetadataTopicPartitionCount有50个分区,计算得到的结果为/35/ 意味着当前的 consumer_group的位移信息保存在__consumer_offsets的第35个分区
执行如下命令,可以查看当前consumer_goup中的offset位移提交的信息
从输出结果中,我们就可以看到test这个topic的offset的位移日志
我们已经知道Kafka的每个topic都可以分为多个Partition,并且多个partition会均匀分布在集群的各个节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica)/通过副本机制来实现冗余备份。
每个分区可以有多个副本,并且在副本***中会存在一个leader的副本,所有的读写请求都是由leader 副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志。这个有点类似zookeeper中leader和follower的概念,但是具体的实现方式还是有比较大的差异。所以我们可以认为,副本***存在一主多从的关系。
一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高kafka集群的可用性。
通过下面的命令去创建带2个副本的topic:
然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过一个图形的方式来表达。针对secondTopic这个topic的3个分区对应的3个副本
在zookeeper服务器上,通过如下命令去获取对应分区的信息/ 比如下面这个是获取secondTopic第1个 分区的状态信息。
或通过这个命令 sh kafka-topics/sh --zookeeper 192/168/13/106/2181 --describe --topic test_partition
leader表示当前分区的leader是那个broker-id。下图中。绿色线条的表示该分区中的leader节点。其他 节点就为follower
需要注意的是,kafka集群中的一个broker中最多只能有一个副本,leader副本所在的broker节点的 分区叫leader节点,follower副本所在的broker节点的分区叫follower节点。
Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的 leader节点发生故障,这个时候怎么处理呢?
那么,kafka必须要保证从follower副本中选择一个新的leader副本。那么kafka是如何实现选举的呢?要了解leader选举,我们需要了解几个概念
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同 可分为3类:
leader副本:响应clients端读写请求的副本
follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同 步后面会提到每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是 leader副本。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0/ 9]。另外, leader LEO和follower LEO的更新是有区别的。我们后面会详细说
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的。
从生产者发出的一条消息首先会被写入分区的leader 副本,不过还需要等待ISR***中的所有follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW/ 进而消费者可以消费到这条消息。
刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从follower副本中选取新的leader。
写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候, leader就会把它踢出去。kafka通过ISR***来维护一个分区副本信息
一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader;leader负责维护和***ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。
ISR表示目前“可用且消息量与leader相差不多的副本***,这是整个副本***的一个子集”。怎么去理解 可用和相差不多这两个词呢?具体来说,ISR***中的副本必须满足两个条件/
副本所在节点必须维持着与zookeeper的连接
副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值 (replica/lag/time/max/ms) replica/lag/time/max/ms:如果该follower在此时间间隔内一直没有追 上过leader的所有消息,则该follower就会被剔除isr列表
ISR数据保存在Zookeeper的 /brokers/topics//partitions//state 节点中
follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafk副本管理器会启动一个副本过期检 查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数 replica/lag/time/max/ms 的值,如果大于,则会把这个副本踢出ISR***
免责声明:
1. 《消息队列kafka - 如何保存消费端的消费位置》内容来源于互联网,版权归原著者或相关公司所有。
2. 若《83404919文库网》收录的文本内容侵犯了您的权益或隐私,请立即通知我们删除。