欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 高考 > Docker部署Kafka集群,增加 SASL_SSL认证,并集成到Spring Boot,无Zookeeper版

Docker部署Kafka集群,增加 SASL_SSL认证,并集成到Spring Boot,无Zookeeper版

2025/5/1 10:59:57 来源:https://blog.csdn.net/Mr_binM/article/details/143775885  浏览:    关键词:Docker部署Kafka集群,增加 SASL_SSL认证,并集成到Spring Boot,无Zookeeper版

1,准备好Kafka 镜像包:

  • bitnami/kafka:3.9.0 镜像资源包

2,准备好kafka.keystore.jks 和 kafka.truststore.jks证书

具体操作可参考:

Docker部署Kafka SASL_SSL认证,并集成到Spring Boot-CSDN博客

3,配置文件 docker-compose.yml

配置中使用的IP 1.14.165.18为主机IP,需要更换,提供外部访问

注意1.14.165.18要替换成主机IP

version: '3.8'services:kafka1:image: bitnami/kafka:3.9.0container_name: kafka1ports:- "9092:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=1- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_TLS_TYPE=JKS- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_1_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-netkafka2:image: bitnami/kafka:3.9.0container_name: kafka2ports:- "9093:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=2- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9093- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_TLS_TYPE=JKS- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_2_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-netkafka-:image: bitnami/kafka:3.9.0container_name: kafka3ports:- "9094:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=3- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9094- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_TLS_TYPE=JKS- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_3_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-net
networks:kafka-net:driver: bridge

4,创建数据挂载目录

sudo mkdir -p /data/kafka/kafka_1_data /data/kafka/kafka_2_data /data/kafka/kafka_3_data
sudo chmod 777 /data/kafka/*

5,启动服务
在 kafka-cluster 目录中运行以下命令来启动 Kafka 集群:

sudo docker-compose up -d

6,测试验证:

在容器修改producer.properties和consumer.properties

增加以下参数:

具体操作可查看上篇文章

ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm=

注意1.14.165.18要替换成主机IP

测试发送消息:

sudo docker exec -it kafka1 kafka-console-producer.sh --bootstrap-server 1.14.165.18:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

测试接收消息:

sudo docker exec -it kafka1 kafka-console-consumer.sh --bootstrap-server 1.14.165.18:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties

 

10,使用Spring Boot 集成Kafka集群

添加pom依赖:

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

 配置application.yml,并修改对应服务IP地址

注意1.14.165.18要替换成Kafka服务器IP

spring:application:name: ncckafka:bootstrap-servers:- 1.14.165.18:9092- 1.14.165.18:9093- 1.14.165.18:9094properties:security.protocol: SASL_SSLsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka2024";ssl.truststore.location: kafka.truststore.jksssl.truststore.password: kafka2024ssl.keystore.location: kafka.keystore.jksssl.keystore.password: kafka2024ssl.key.password: kafka2024ssl.endpoint.identification.algorithm:producer.ssl.endpoint.identification.algorithm:consumer.ssl.endpoint.identification.algorithm:

并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目

11,创建KafkaTest测试类

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest(classes = NccApplication.class)
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testvoid send() {kafkaTemplate.send("test","hhh");}}

测试通过

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词