最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

今头条!【kafka】-生产环境问题-报错Maximum application poll interval

来源:博客园

一.产生的问题


(资料图片仅供参考)

在.NET环境下使用kafka,消费者长时间消费,会报“Application maximum poll interval (10000ms)”错误。

二.重现问题

2.1.消费者配置

SessionTimeoutMs(会话超时时间)和MaxPollIntervalMs(上一次拉取消息和本次拉取消息之间的时间间隔)配置为10秒。

1 var config = new ConsumerConfig 2 { 3     BootstrapServers = brokerServer, 4     //同组轮询,不同组广播 5     GroupId = groupName, 6  7     EnableAutoCommit = false, 8  9     AutoOffsetReset = AutoOffsetReset.Earliest,10 11     EnablePartitionEof = true,12     PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range,13       14     //心跳(<)超时15     SessionTimeoutMs = 10000,16     //上一次拉取消息和本次拉取消息之间的时间间隔17     MaxPollIntervalMs = 10000,18 };

2.2.模拟一个耗时的业务

DoBussinessForLongTime()方法中特意休眠12秒,模拟做业务慢场景。

1 ConsumeResult consumeResult = null; 2  3 while (true) 4 { 5     try 6     { 7         //拉取kafka消息 8         consumeResult = consumer.Consume(cancellationToken); 9         #region 模拟长时间做公司业务10         {11           DoBussinessForLongTime(consumeResult);12         }13         #endregion14 15         consumer.Commit(consumeResult);16     }17     catch (ConsumeException e)18     {19         Console.WriteLine($"Consume error: {e.Error.Reason}");20     }21     catch (KafkaException e)22     {23         Console.WriteLine($"Commit error: {e.Error.Reason}");24     }25     catch (OperationCanceledException e)26     {27         throw e;28     }29     catch (Exception e)30     {31         Console.WriteLine($"Commit error: {e}");32     }33 }34 35 36 /// 37 /// 长时间做公司业务38 /// 配合SessionTimeoutMs = 10000,MaxPollIntervalMs = 10000使用39 /// 40 /// 41 private static void DoBussinessForLongTime(ConsumeResult consumeResult)42 {43     Console.WriteLine($"DoBussinessForLongTime Start {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}");44     Thread.Sleep(12000);45     Console.WriteLine($"DoBussinessForLongTime End {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}");46 }

给名为MyTopic的主题生产3条消息,分别为a、b、c。

我们最终看到在拉取第2条消息时,报ConsumeException异常了,红色框内的异常是我们主动打印出来的,黄色框内的异常是.NET自己打印出来的。我们来翻译下这句话“Application maximum poll interval (10000ms) exceeded by 183ms (adjust max.poll.interval.ms for long-running message processing): leaving group”,这句话的意思是说:拉取最大时间间隔为10秒,超过了183毫秒,离开组。

详细解释下这个报错是什么意思:这个报错是指你上一次拉取消息和本次拉取消息之间的时间间隔超过了kafka允许的最大的拉取消息的时间间隔(kafka里默认配置是5分钟,我们修改了消费者配置是10秒),也就是说kafka认为你拉取消息后做了大于10秒的业务,拉到消息是要去做业务的(每家公司做的业务不同),什么业务能做10秒呢?(做完业务才再去拉取下一条消息),所以kafka的组协调器(GroupCoordinator)就认为你这个消费者(每一个消费者都归属于一个消费者组)不给力,觉的你干活太慢了,做业务太慢了,把你这个消费者从消费者组移除出去,就报错了。

这个问题产生的原因:是咱们引入的第三方的dll(.NET里引用的是Confluent.Kafka.dll)在长时间消费过程中,会偶发判断上一次拉取消息和本次拉取消息之间的时间间隔有误(怀疑),明明没有超过5分钟(kafka里默认配置是5分钟),但是确判断是超过了5分钟。

三.如何解决这个问题?

使用异常过滤器,捕获到“Application maximum poll interval”异常,记录下日志就好了,然后继续去拉取本次失败的消息(前提是已经对之前拉取到的消息做了偏移量提交)。

1  catch (ConsumeException e) when (e.Error.Reason.Contains("Application maximum poll interval"))//满足条件才进入catch2  {3    //此处记录下日志4  }

关键词: