欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > SpringBoot集成Kafka

SpringBoot集成Kafka

2025/5/17 5:15:47 来源:https://blog.csdn.net/qq_62877881/article/details/147899263  浏览:    关键词:SpringBoot集成Kafka

SpringBoot集成Kafka

Topic
在这里插入图片描述

1、引入kafka

(1)引依赖

pom.xml文件引入kafka的依赖

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.3</version></dependency>

(2)添加配置信息

application.yml配置kafka信息

# 端口
server:port: 8080
# spring配置
spring:application:name: kafka-basekafka:# kafka地址信息bootstrap-servers: xxxxx:9092

2、生产者

(1)简单生产者发送String消息

package com.yuhao.kafkabase.producer;import org.springframework.stereotype.Component;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
//生产者
@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public void sendEvent(String topic, String message) {kafkaTemplate.send(topic, message);}
}
// ---------------------------------------发送消息---------------------------------------------------
package com.yuhao.kafkabase.producer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
@SpringBootTest
class EventProducerTest {@Resourceprivate EventProducer enventProducer;@Testvoid sendEvent() {enventProducer.sendEvent("hello",  "hello kafka");}
}

(2)发送Message对象消息

// 发送Message对象public void sendMessageObject(String topic, String message) {Message msg = MessageBuilder.withPayload(message).setHeader(KafkaHeaders.TOPIC, topic) // 通过在header中指定Topic.build();kafkaTemplate.send(msg);}

(3)发送ProducerRecord对象消息

在这里插入图片描述

 // 发送ProducerRecord对象public void sendMessageProducerRecord(ProducerRecord<String,String> producerRecord) {kafkaTemplate.send(producerRecord);}@Testvoid sendMessageProducerRecord() {Headers headers  = new RecordHeaders();headers.add("name","yuhao".getBytes(StandardCharsets.UTF_8));ProducerRecord<String,String> producerRecord= new ProducerRecord("hello",0,"hello kafka 这是我第一次给你发ProducerRecord对象信息",headers);enventProducer.sendMessageProducerRecord(producerRecord);}

(4)kafka还支持其他参数消息发送

  • 主要方法send(),sendDefault(),两个方法默认都是异步操作。
  • 指定partion分区
  • 像默认Topic发送消息

(5)获取发送消息的结果

  • 阻塞式获取发送结果,使用CompletableFuture的get方法
  • 非阻塞获取发送结果,使用CompletableFuture的thenAccept(),thenApply(),thenRun()方法

(6)消息发送分区策略

  • 默认分配策略:根据key计算哈希值之后对分区个数取余数,未指定key则使用随机数对分区取余数
  • RoundRobin策略
  • 自定义分区策略:实现Partitioner接口

(7)生产者发送消息流程

在这里插入图片描述

3、插件安装

使用idea的kafka插件查看kafka内的状态
在这里插入图片描述

4、消费者

(1)简单消息读取

package com.yuhao.kafkabase.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {// 使用@kafkaListener注解,消费消息。需要指定消息所属的topic,以及的消费组Id@KafkaListener(topics = "hello", groupId = "group-1")public void receive(String message) {System.out.println("接收到消息:" + message);}
}

可以看到并没有读取到topic内的消息,原因是因为当你启动kafka默认消费者配置是读取最新的消息,因此先前的消息并不会被消费。
在这里插入图片描述
发送第二条信息
在这里插入图片描述
查看
在这里插入图片描述
要想从offset0开始读取消息,需要修改yml中的配置

spring:application:name: kafka-basekafka:bootstrap-servers: xxx:9092# 生产者相关配置#producer:# 消费者相关配置consumer:# 从的offset0开始消费auto-offset-reset: earliest

重新启动,同时重新设置groupId,因为kafka不允许同一个消费组Id重复消费,因为kafka会记录消费组Id的offset,所以即使修改配置也不会生效。需要将groupId改为group-2,重新指定消费组Id。
在这里插入图片描述
可以手动的重置offset使用kafka bin目录下的脚本bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group-1 --reset-offsets --to-earliest --topics hello --execute

(2)接收消息头内容

 @KafkaListener(topics = "hello", groupId = "group-1")public void receive(@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic) {System.out.println("接收到消息头内的Topic信息:" + topic);}

(3)接收完整消息内容

@KafkaListener(topics = "hello", groupId = "group-1")public void receive(ConsumerRecord<String,String> record) {System.out.println("接收到完整消息信息:" + record);}

(4)读取指定Topic-partition-offset消息

 @KafkaListener( groupId = "group-2",topicPartitions = {@TopicPartition(topic = "topic", partitions = {"0"},partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")}),})

(5)批量消费

application.yml中修改配置

spring:application:name: kafka-basekafka:# kafka地址信息bootstrap-servers: xxxxx:9092listener:type: batch # 批量消费# 消费者相关配置consumer:# 一次最大消费数max-poll-records: 20

(6)消费消息拦截器

自定义消费拦截器需要实现ConsumerInterceptor接口,并在ConsumerFactory中注册拦截器。之后在使用@KafkaListener(topics = "hello", groupId = "group-2",containerFactory=" ")指定自定义容器工厂。

package com.yuhao.kafkabase.interceptor;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.Map;public class CustomConsumerIntercepetor implements ConsumerInterceptor {//消息消费前执行@Overridepublic ConsumerRecords onConsume(ConsumerRecords consumerRecords) {return null;}@Overridepublic void close() {}//拿到消息提交offset之前@Overridepublic void onCommit(Map map) {}@Overridepublic void configure(Map<String, ?> map) {}
}

(7)消息转发

使用@SendTo注解

@KafkaListener(topics = "hello", groupId = "group-1")
@SendTo(value="topicB"public String receive(ConsumerRecord<String,String> record) {System.out.println("接收到完整消息信息:" + record);return record.getValue();}

(8)消息消费分区策略

  • RangeAssignor策略:根据分区数Partition和Consumer数来确定,每个消费者应得数=Partition数/Consumer数。
  • RoundRobinAssignor策略:轮询策略,将所有主题的分区按顺序轮流分配给消费者。
  • StickyAssignor策略:初始分配时尽量均衡(类似轮询)。当消费者组发生变化(如加入 / 退出消费者)时,保留原分配关系,只调整必要的分区。
  • CooperativeStickyAssignor策略:与 StickyAssignor 类似,但支持增量式重平衡。当分区需要重新分配时,允许消费者暂时处理部分重叠分区,避免数据处理中断。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词