欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > springboot整合kafka

springboot整合kafka

2025/9/27 7:10:22 来源:https://blog.csdn.net/qq_42508660/article/details/143255105  浏览:    关键词:springboot整合kafka

Springboot整合Kafka

1.下载kafka

启动

1.zookeeper-server-start.bat ..\..\config\zookeeper.properties2.kafka-server-start.bat ..\..\config\server.properties3.kafka-server-stop.bat4.zookeeper-server-stop.bat

2.引入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

3.消息实体

package com.example.kafka;import java.util.Date;public class KafkaMessage
{private long id;private String username;private String password;private Date date;public long getId(){return id;}public void setId(long id){this.id = id;}public String getUsername(){return username;}public void setUsername(String username){this.username = username;}public String getPassword(){return password;}public void setPassword(String password){this.password = password;}public Date getDate(){return date;}public void setDate(Date date){this.date = date;}
}

4.生产者

package com.example.kafka;import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class KafkaProducer
{@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;public KafkaTemplate<String, String> getKafkaTemplate(){return kafkaTemplate;}public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}public void sendKafkaMessage(KafkaMessage message){kafkaTemplate.send("myTopic", JSONObject.toJSONString(message));}}

5.消费者

package com.example.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer
{@KafkaListener(topics = "myTopic", groupId = "myGroup")public void obtainMessage(ConsumerRecord<String, String> consumerRecord){System.out.println("obtainMessage invoked");String topic = consumerRecord.topic();String key = consumerRecord.key();String value = consumerRecord.value();long timestamp = consumerRecord.timestamp();int partition = consumerRecord.partition();System.out.println("topic:" + topic);System.out.println("key:" + key);System.out.println("value:" + value);System.out.println("timestamp:" + timestamp);System.out.println("partition:" + partition);System.out.println("=======================");}
}

5.controller测试

package com.example.controller;import com.example.kafka.KafkaMessage;
import com.example.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;@RestController
@RequestMapping(value = "/kafka")
public class KafkaController
{SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate KafkaProducer kafkaProducer;@RequestMapping(value = "message", method = RequestMethod.GET)public KafkaMessage sendKafkaMessage(@RequestParam(name = "id") long id, @RequestParam(name = "username") String username,@RequestParam(name = "password") String password){KafkaMessage kafkaMessage = new KafkaMessage();kafkaMessage.setId(id);kafkaMessage.setUsername(username);kafkaMessage.setPassword(password);kafkaMessage.setDate(new Date());kafkaProducer.sendKafkaMessage(kafkaMessage);return kafkaMessage;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词