欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 会展 > Kafka代码模板

Kafka代码模板

2025/6/20 0:01:30 来源:https://blog.csdn.net/weixin_46646308/article/details/148671149  浏览:    关键词:Kafka代码模板

Kafka 服务器(Broker) 的配置

server.properties

# broker.id: 每个 Kafka Broker 的唯一标识符。broker.id 必须在整个 Kafka 集群中唯一。
broker.id=0# 配置 Kafka Broker 监听客户端请求的地址和端口。这个配置决定了 Kafka 服务将接受来自生产者、消费者以及其他客户端的连接。
listeners=PLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存储目录
log.dir=/usr/local/data/kafka‐logs#  Kafka 连接到 Zookeeper 的地址
zookeeper.connect=192.168.65.60:2181

每个 Kafka 集群中的节点(Broker)都需要有一个 server.properties 配置文件,并且每个节点的配置可以有所不同。

生产者

生产者配置

Properties props = new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把发送的key和value从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 发出消息持久化机制参数* acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息* acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入,就可以继续发送下一条消息*          如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失* acks=‐1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志。这是最强的数据保证。一般金融级别才会使用这种配置*/
props.put(ProducerConfig.ACKS_CONFIG, "1");// 2. 重试相关
//2.1 发送失败重试次数,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,需要接收者做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重试间隔设置,默认重试间隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地缓冲区和延迟发送相关* 在设置本地缓冲区/延迟发送后,消息会先发送到本地缓冲区,当达到批量发送消息的大 * 小时,本地线程会从缓冲区取数据(一个batch),批量发送到broker。同时,需要设置 * batch最大的延迟发送时间,如果一条消息在本地缓冲区中等待的时间达到设置的时间后 * batch没满,那么也必须把消息发送出去* /// 3.1 设置本地缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 设置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延迟发送时间* 默认值是0:意思就是消息必须立即被发送,但这样会影响性能* 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去* 如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长* *  消息 -> 本地缓冲区(32M)-> batch(16k)-> 发送(10ms batch不满也发送)*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

生产者发送消息

// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息
String topic = "test-topic";  // 主题名称
String key = "order1";  // 消息的 key
String value = "Order details: 123";  // 消息的内容// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 发送消息,这里的lambda函数就是onCompletion()方法producer.send(record, (metadata, exception) -> {if (exception != null) {System.out.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully to topic " + metadata.topic() +" partition " + metadata.partition() + " with offset " + metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 关闭生产者producer.close();
}
// 指定发送分区
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);// 也可以指定发送分区
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();// 异步回调方式发送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 处理异常}
});
// 关闭
producer.close();

此外,为了保证生产者的消息发送成功,可以通过添加回调函数的方式,在send成功后打印日志。

详细内容参考:Kafka如何保证消息不丢失

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

消费者

消费者配置

Properties properties = new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把发送的key和value从字符串序列化为字节数组
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/* * 当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费* latest(默认) :只消费自己启动之后发送到主题的消息* earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于 consumer.seekToBeginning(每次都从头开始消费)*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将rebalance方案下发给consumer,这个时间可以稍微短一点
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,对应的Partition也会被重新分配给其他consumer,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

消费者消费消息

// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));// 消费指定分区,这段代码指定了消费者从TOPIC_NAME的第一个分区(分区0)开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消费(从头消费 - seekToBeginning)* seekToBeginning()方法使消费者回溯到该分区的最初位置,意味着从头开始消费该分    区的所有消息。* 这对于重新消费主题中的消息或重新同步时非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消费,即消费者将跳过之前的消息,从该offset开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 从指定时间点开始消费 - 1小时前
* partitionsFor()方法获取指定主题(TOPIC_NAME)的所有分区信息。
* fetchDataTime 是一个时间戳,表示1小时前的时间,new Date().getTime() - 1000 * 60 * 60 用来计算这个时间戳。
* map 用于存储每个分区与其对应的时间戳(fetchDataTime)。这个时间戳将用于从Kafka中拉取时间戳较早的消息。
*/ 
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
// 消费消息
try {while (true) {consumer.poll(1000).forEach(record -> {// 可以修改为具体业务逻辑System.out.println("Consumed record with key: " + record.key() +", value: " + record.value() + ", from partition: " + record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 关闭消费者consumer.close();
}

消费者提交offset

手动提交offset的意义

  1. 控制消费进度

手动提交offset能够让消费者在每个消息或消息批次消费后,明确地告诉Kafka“我已经消费到这个offset了”。这对于控制消息消费的精确性非常重要,尤其在需要精确控制消费位置的场景中。

  1. 避免消息丢失或重复消费

如果自动提交offset,可能会发生消费者在处理中出现异常(如程序崩溃),导致已消费的消息的offset提交失败,导致消息丢失或重复消费。手动提交则可以在处理完消息并确保成功时再提交offset,避免这种问题。比如在金融交易、日志收集系统等场景中,需要确保消息的处理不会丢失,并且不会重复处理。

  1. 灵活的错误处理与恢复

通过手动提交offset,消费者可以在消费过程中灵活地处理错误。如果在消费某条消息时发生异常,消费者可以选择不提交offset,这样在消费者重启或恢复时会重新消费该消息。它使得消费者在出错时能更好地控制重试策略。

代码实现

同步提交
consumer.commitSync();

当调用该方法时,消费者会将当前消费的偏移量提交到Kafka集群,并且当前线程会阻塞,直到该提交操作完成。

优势:

  1. 阻塞:会等待offset提交成功,不会继续执行后续代码,直到提交完成。
  2. 可靠性:如果提交失败,commitSync()会抛出异常,可以捕获并进行处理,确保提交正确。

缺点:会导致性能问题,因为它会阻塞当前线程,直到提交完成。

异步提交
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {// 处理异常}
});

回调函数:异步提交会接受一个OffsetCommitCallback回调接口作为参数,该接口的onComplete()方法会在提交操作完成时被调用。这个方法会接收到两个参数:
offsets:包含提交的偏移量信息(TopicPartition和OffsetAndMetadata)。
ex:如果提交发生错误,该参数会包含异常信息。

优势:

  1. 非阻塞:不会等待提交完成,允许程序继续执行其他操作。
  2. 提高吞吐量:减少等待时间,尤其是在批量消费和提交的情况下,可以提高整体的吞吐量和性能。

缺点:可能会出现提交失败的情况,回调函数中的异常处理需要做好,以确保异常得到及时处理。

Spring boot集成

1. 添加依赖

在pom.xml 中添加 Kafka 的相关依赖

<dependencies><!-- Spring Boot Starter for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional if you need a web app) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter for Actuator (optional for monitoring) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Spring Boot Starter Test (optional for testing) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml

spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate

注意:
ack‐mode
RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
BATCH:当每一批poll()的数据被消费者监听器处理之后提交
TIME:当每一批poll()的数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
COUNT:当每一批poll()的数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
TIME | COUNT:有一个条件满足时提交
MANUAL:当每一批poll()的数据被消费者监听器处理之后, 手动调用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种(一次提交一条消息)

3. 启动类

package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {@Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送消息kafkaProducer.sendMessage("test-topic", "Hello, Kafka!");}
}

4. 生产者类

package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent: " + message);}
}

5. 消费者类

package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group")public void consume(String message) {System.out.println("Consumed message: " + message);}@KafkaListener(topics = "test-topic",groupId = "test-group")public void consume1(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();ack.acknowledge();  //手动提交offset}// 配置多个topic,concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 从topic1的分区0和1读取消息@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) // 从topic2的分区0读取消息,并设置分区1的初始偏移量为100}, concurrency = "6")public void listenToMultipleTopics(String message) {// 消费消息的逻辑System.out.println("Group: testGroup, Message: " + message);}
}

Kafka事务

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());// 初始化事务
producer.initTransactions();
try {// 开启事务producer.beginTransaction();// 发到不同的主题的不同分区producer.send(/*...*/);// 提交事务producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滚事务producer.abortTransaction();
}
// 关闭
producer.close();

spring框架下Kafka事务

可以通过**@Transactional**实现

配置

可以通过在application.yml文件或KafkaConfig配置类中添加配置的方式,提供事务支持。

1. application.yml
spring:kafka:bootstrap-servers: localhost:9092  # Kafka 集群地址producer:acks: all  # 确保消息被所有副本确认transactional-id-prefix: tx-  # 事务前缀,Kafka 事务需要一个事务 ID 前缀consumer:group-id: test-group  # 消费者组 IDenable-auto-commit: false  # 手动提交 offsetlistener:ack-mode: manual  # 设置为手动提交确认
2. 配置类
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {// 设置 Kafka 生产者的事务管理器KafkaTransactionManager<String, String> transactionManager =new KafkaTransactionManager<>(producerFactory());KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}@Beanpublic DefaultKafkaProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");  // 事务 IDreturn new DefaultKafkaProducerFactory<>(configProps);}
}

生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;@Service
public class KafkaTransactionProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Transactionalpublic void sendTransactionalMessages() {try {// 发送事务消息kafkaTemplate.send("topic1", "key1", "message1");kafkaTemplate.send("topic2", "key2", "message2");// 你可以在此处加入其他业务逻辑,如果出现异常,会回滚事务if (someConditionFails()) {throw new RuntimeException("Simulating failure to trigger rollback");}// 如果没有异常,事务提交,消息将被正常发送} catch (Exception e) {// 事务回滚System.out.println("Transaction failed, rolling back...");throw e;}}private boolean someConditionFails() {// 模拟某些条件下事务失败return true;}
}

消费者

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaTransactionConsumer {@KafkaListener(topics = "topic1", groupId = "test-group")public void listenTopic1(String message) {System.out.println("Received message from topic1: " + message);}@KafkaListener(topics = "topic2", groupId = "test-group")public void listenTopic2(String message) {System.out.println("Received message from topic2: " + message);}
}

在生产者的配置中启用事务,配置 transactional.id,并设置事务管理器 KafkaTransactionManager,它会自动管理 Kafka 事务的开始、提交和回滚。

事务管理:@Transactional 注解用于标识在发送消息的过程是一个事务操作。如果其中任何消息发送失败,Spring Kafka 会自动回滚事务。

回滚机制:在 sendTransactionalMessages() 中模拟了一个失败的条件,确保事务在遇到异常时会被回滚。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词