欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

kafka seek方法

发布时间:2023/12/20 41 豆豆
生活随笔 收集整理的这篇文章主要介绍了 kafka seek方法 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

我们知道消息的拉取是根据poll()方法中的逻辑来处理的,这个poll()方法中的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或
末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:

public void seek(TopicPartition partition , long offset)

seek()方法中的参数partition 表示分区,而offset 参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在po ll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次p oll ()方法, 等到分配到分区之后才可以重置消费位置(如果用subscribe订阅的话就需要poll一次,如果用assign()手动订阅分区就不需要poll一次)。seek()方法的使用示例如代码清单3 - 5 所示(只列出关键代码〉。

如果对未分配到的分区执行seek()方法, 那么会报出IllegalStateException 的异常。类似在调用subscribe()方法之后直接调用seek()方法:

endOffsets()方法

用来获取指定分区的末尾的消息位置,endOffsets 的具体方法定义如下:

public Map<TopicPartition , Long> endOffsets( Collection<TopicPartition> partitions )public Map<TopicPartition , Long> endOffsets( Collection<TopicPartition> partitions , Duration timeout)

其中partitions 参数表示分区集合,而timeout 参数用来设置等待获取的超时时间。如果没有指定timeout 参数的值, 那么endOffets()方法的等待时间由客户端参数request.timeout.ms 来设置, 默认值为3 0000 。与endOffsets 对应的是

beginningOffsets()方法

一个分区的起始位置起初是0,但并不代表每时每刻都为0 , 因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加, beginningOffsets()方法的具体定义如下:

public Map<TopicPartition , Long> beginningOffsets( Collection<TopicPartition> partitions )public Map<TopicPartition , Long> beginningOffsets( Collection<TopicPartition> partitions , Duration timeout)

beginningOffsets()方法中的参数内容和含义都与endOffsets()方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer 中直接提供了seekToB eginning()方法和seekToEnd()方法来实现这两个功能, 这两个方法的具体定义如下:

有时候我们并不知道特定的消费位置, 却知道一个相关的时间点, 比如我们想要消费昨天8 点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek()方法来追溯到相应的位置。KafkaConsumer 同样考虑到了这种情况,它提供了一个offsetsForTimes()方法,通过timestamp 来查询与此对应的分区位置。

offsetsF orTimes()方法的参数timestamps T oSearch 是一个Map 类型, key 为待查询的分区,而value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp 中的off set 和time stamp 字段。下面的示例演示了offsetsForTimes()和seek()之间的使用方法, 首先通过offsetFor Times()方法获取一天之前的消息位置,然后使用seek() 方法追溯到相应位置开始消费,示例中的assignment 变量和代码清单3-7 中的一样,表示消费者分配到的分区集合。

 

总结

以上是生活随笔为你收集整理的kafka seek方法的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。