第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题( 七 )


在使用自动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit 被设为 true 时,在调用 close()方法之前也会进行自动提交) 。一般情况下不会有什么
问题,不过在处理异常或提前退出轮询时要格外小心 。
自动提交虽然方便,但是很明显是一种基于时间提交的方式,不过并没有为我们留有余地来避免重复处理消息 。
3.7.2.手动提交(同步)我们通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量 。消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔 。
把 auto.commit. offset 设为 false,自行决定何时提交偏移量 。使用 commitsync()提交偏移量最简单也最可靠 。这个方法会提交由 poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常 。
注意:commitsync()将会提交由 poll()返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitsync(),否则还是会有丢失消息的风险 。如果发生了再均衡,从最近批消息到发生再均衡之间的所有消息都将被重复处理 。
只要没有发生不可恢复的错误,commitSync()方法会阻塞,会一直尝试直至提交成功,如果失败,也只能记录异常日志 。
3.7.3.异步提交手动提交时,在 broker 对提交请求作出回应之前,应用程序会一直阻塞 。这时我们可以使用异步提交 API,我们只管发送提交请求,无需等待 broker的响应 。
在成功提交或碰到无法恢复的错误之前, commitsync()会一直重试,但是 commitAsync 不会 。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功 。
假设我们发出一个请求用于提交偏移量 2000,,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应 。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000 。如果 commitAsync()重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功 。这个时候如果发生再均衡,
就会出现重复消息 。
commitAsync()也支持回调,在 broker 作出响应时会执行回调 。回调经常被用于记录提交错误或生成度量指标 。
3.7.4.同步和异步组合因为同步提交一定会成功、异步可能会失败,所以一般的场景是同步和异步一起来做 。
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的 。但如果这是发生在关闭消费者或 再均衡前的最后一次提交,就要确保能够提交成功 。
因此,在消费者关闭前一般会组合使用 commitAsync()和 commitsync() 。具体使用,参见模块 kafka-no-spring 下包 commit 包中代码 SyncAndAsync 。
3.7.5.特定提交在我们前面的提交中,提交偏移量的频率与处理消息批次的频率是一样的 。但如果想要更频繁地提交该怎么办?
如果 poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用commitSync()或 commitAsync()来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完 。
消费者 API 允许在调用 commitsync()和 commitAsync()方法时传进去希望提交的分区和偏移量的 map 。假设我们处理了半个批次的消息,最后一个来自主题“customers”,分区 3 的消息的偏移量是 5000,你可以调用 commitsync()方法来提交它 。不过,因为消费者可能不只读取一个分区,因为我们需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂 。
3.8.分区再均衡

第一个kafka程序,详谈生产者消费者,顺序消费重复消费问题

文章插图
 
3.8.1.再均衡监听器在提交偏移量一节中提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作比如,提交偏移量、关闭文件句柄、数据库连接等 。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe()方法时传进去一个 ConsumerRebalancelistener实例就可以了 。
ConsumerRebalancelistener 有两个需要实现的方法 。
1) public void onPartitionsRevoked( Collection< TopicPartition> partitions)方法会在再均衡开始之前和消费者停止读取消息之后被调用 。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了


推荐阅读