欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 艺术 > kafka SASL/PLAIN 认证及 ACL 权限控制

kafka SASL/PLAIN 认证及 ACL 权限控制

2025/6/2 22:12:37 来源:https://blog.csdn.net/xfp1007907124/article/details/148257228  浏览:    关键词:kafka SASL/PLAIN 认证及 ACL 权限控制

一、Zookeeper 配置 SASL/PLAIN 认证(每个zookeeper节点都要做)

1.1 在 zookeeper 的 conf 目录下,创建 zk_server_jaas.conf 文件,内容如下

Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_kafka="kafka";
};

username="admin" 是 zookeeper 实例之间通信的用户;user_kafka="kafka" 是 kafka broker 与 zookeeper 连接的时候的认证用户,密码为=后面的值;

1.2 修改 zoo.cfg ,添加以下内容

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.3 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是 kafka-client.jar 里面,所有需要将相应的 jar 包拷贝到 zookeeper 安装根目录的 lib 目录下, 大概要 copy 以下这些 jar 包

mv /opt/module/kafka_2.12-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/lz4-java-1.6.0.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/osgi-resource-locator-1.0.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/slf4j-api-1.7.28.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/snappy-java-1.1.7.3.jar /opt/module/zookeeper/lib/

1.4 修改 zookeeper 启动命令参数,在文件末尾追加以下内容

SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/module/zookeeper/conf/zk_server_jaas.conf"

1.5 重启 zookeeper 服务。

二、Kafka Server 配置 SASL/PLAIN 认证(每个kafka节点都要做)

2.1 修改 kafka server.properties 文件,添加以下内容(各个node修改为自己的hostname)

listeners=SASL_PLAINTEXT://node2:9092
advertised.listeners=SASL_PLAINTEXT://node2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer  
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true 
# 设置本例中admin为超级用户
super.users=User:admin

2.2 在kafka config 目录下创建 kafka_server_jass.conf 文件,内容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

KafkaServer 段里面配置了 broker 之间的认证配置以及 client 和 broker 之间的认证配置

KafkaServer.username, KafkaServer.password 用于broker之间的相互认证

KafkaServer.user_admin和KafkaServer.user_consumer 用于 client 和broker 之间的认证, 下面我们 client 里面都用用户 consumer 进行认证

Client 段里面定义 username 和 password 用于 broker 与 zookeeper 连接的认证;

2.3 修改 kafka 启动命令脚本工具 kafka-server-start.sh,添加 java.security.auth.login.config 环境变量

将最后一句

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改为

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

2.4 修改完所有节点后,重启 kafka 服务,查看启动日志。

三、 Kafka client 的认证配置

3.1 在 Kafka 的 config 目录下创建 kafka_client_jaas.conf 文件,内容如下

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="consumer"password="consumer";
};

3.2 修改 kafka 的 consumer.properties 和 producer.properties,添加以下内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

3.3 修改 producer 启动脚本参数,修改 kafka-console-producer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"

3.4 修改 consumer 启动脚本参数,修改 kafka-console-consumer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleConsumer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"

四、ACL 配置

4.1 因为我们有个超级用户 admin,所以可以用 admin 做生产者,admin不需要做 ACL 配置

4.2 为 test topic 添加消费者用户 consumer

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:consumer --consumer --topic test --group test_group

4.3 测试

生产者测试

kafka-console-producer.sh --broker-list 
node2 --topic test --producer.config /opt/module/kafka_2.12-2.4.1/config/producer.properties

消费者测试

kafka-console-consumer.sh --bootstrap-server 
node2:9092 --topic test --from-beginning --consumer.config 
/opt/module/kafka_2.12-2.4.1/config/consumer.properties  

4.4 Flink 代码消费测试

object TestKafkaSource {// Kafka Broker 地址private val BOOTSTRAP_SERVERS = "node2:9092"// SASL/PLAIN 认证配置private val SASL_USERNAME = "consumer" private val SASL_PASSWORD = "consumer"// Topic 名称private val ALLOWED_TOPIC = "test"private val DENIED_TOPIC = "test1"def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Propertiesprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group")props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")// SASL/PLAIN 配置props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")props.put("sasl.mechanism", "PLAIN")props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD))val dataSource: DataStreamSource[String] = env.addSource(new FlinkKafkaConsumer[String](ALLOWED_TOPIC , new SimpleStringSchema(), props))dataSource.print()env.execute()}
}

五、kafka-exporter 配置(使用了 kafka-exporter 监控 kafka 集群才做)

5.1 在kafka_server_jaas.conf 文件中加入 kafka-exporter 用户信息,内容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"user_kafka-exporter="kafka-exporter"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

5.2 分发到所有 kafka broker ,并重启 kafka 服务

5.3 为 kafka-exporter 用户配置权限

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --group '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic __consumer_offsets

5.4 配置 kafka_export 服务启动参数,在 kafka-export 安装目录下创建一个文件 start_kafka_export ,内容如下

#!/bin/bash
export KAFKA_SASL_USERNAME="kafka-exporter"
export KAFKA_SASL_PASSWORD="kafka-exporter"
export KAFKA_SASL_MECHANISM="PLAIN"/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181/kafka241 \--sasl.enabled \--sasl.username=$KAFKA_SASL_USERNAME \--sasl.password=$KAFKA_SASL_PASSWORD \--sasl.mechanism=$KAFKA_SASL_MECHANISM

5.5 修改 kafka_export.service 文件,内容如下

[Unit]
Description=kafka_exporter
Wants=prometheus.service
After=network.target prometheus.service[Service]
Type=simple
User=bigdata
Group=bigdata
Restart=on-failure
WorkingDirectory=/opt/module/kafka_exporter-1.7.0
#ExecStart=/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181
ExecStart=/opt/module/kafka_exporter-1.7.0/start_kafka_exporter
[Install]
WantedBy=multi-user.target

5.6 重新加载 systemctl 管理的服务

sudo systemctl daemon-reload

5.7 启动 kafka_exporter 服务

sudo systemctl start kafka_exporter

5.8 上 grafana 查看是否能获取 kafka 状态

六、 参考文档

6.1 官网文档 Apache Kafka

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词