欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 高考 > 浅谈Kafka(三)

浅谈Kafka(三)

2025/5/3 18:12:43 来源:https://blog.csdn.net/dolly_baby/article/details/141498741  浏览:    关键词:浅谈Kafka(三)

浅谈Kafka(三)

文章目录

    • 浅谈Kafka(三)
      • Kafka目录介绍
      • 基础操作
      • JMX接口
      • 消费者是否能够消费指定分区的消息
      • 生产者是否发送消息到leader
      • 创建主题时如何把分区放到不同broker中
      • Kafka新建的分区在哪个目录创建
      • Kafka java示例

Kafka目录介绍

  1. bin:执行脚本
  2. config:配置文件
  3. libs:运行所需要的jar包
  4. logs:日志文件
  5. site-docs:网站的帮助文档

基础操作

  1. 创建topic、生产消息到Kafka、从Kafka消费消息。
# 创建主题test
bin/kafka-topics.sh --create-topic test --bootstrap-server localhost:9092
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 从Kafka中读取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. 图形化操作工具有KafkaTools、Kafka-Eagle等。
  2. 内置性能测试工具
  • kafka-producer-perf-test.sh
  • kafka-consumer-perf-test.sh
# 基于1个分区1个副本的基准测试
bin/kafka-topics.sh --bootstrap-server lcoalhost:9092 --create-topic benchmark --partitions 1 --replication-factor 1
# --throughput指定吞吐量,-l不指定,--record-size指定record数据大小
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 50000000 -–throughput -l -–record-size 1000 –-producer-props  bootstrap.servers=localhost:9092 acks=1
  1. Kafka集群搭建
  • Kafka版本号:kafka_2.12-2.4.1,kafka采用scale开发,2.12为scale的版本号。
1. 安装包的上传解压
2. 修改kafka的config目录下的server.properties配置文件broker.id=0 指定broker的idlog.dirs=/data 指定kafka数据的位置
3. 把安装好的kafka复制到另外两台机器
4. 配置环境变量vi /etc/profile;export KAFKA_HOME=/kafka_2.12-2.4.1;export PATH = $PATH:${KAFKA_HOME}; source /etc/profile;

JMX接口

  1. JMX接口是一个为应用程序植入管理功能的框架。
  2. 开启Kafka JMX,暴露JMX端口。
export JMX_PORT=9988

消费者是否能够消费指定分区的消息

​ 消费者消费消息时,向broker发出fetch请求去消费特定分区的消息,消费者指定消息在日志中的偏移量,就可以消费从这个位置开始的消息,消费者拥有偏移量的控制权,可以向后回滚去重新消费之前的消息。

生产者是否发送消息到leader

​ 生产者直接将数据发送到broker的leader,不需要在多个节点进行分发,为了帮助生产者做到这点,所有的Kafka节点都可以及时的告知哪些节点是活动的,目标主题分区的leader在哪里。

创建主题时如何把分区放到不同broker中

  1. 副本因子不能大于broker的数量。
  2. 编号为0的第一个分区的第一个副本的放置位置是随机从broker列表中选择。
  3. 其他分区的第一个副本的放置位置相对于第0个分区依次往后移,剩余的副本相对于第一个副本位置由随机产生的nextReplicaShift决定。

Kafka新建的分区在哪个目录创建

  1. Kafka集群驱动前需要设置log.dirs参数即数据的存放目录,这个参数可以配置多个以逗号分隔的目录,这些目录通常分布在不同的磁盘上用于提升读写性能。我们也可以设置log.dir参数,只需要设置其中一个就行了。
  2. 如果log.dirs参数只配置了一个目录,那么分配到各个broker上的分区肯定只能在这目录下创建文件用于存放数据。如果配置了多个目录,Kafka就会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为topic名+分区ID。

Kafka java示例

  1. 打开控制台消费者,等待主题test消息。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. 创建生产者并执行发送消息。
package org.lxx.stream.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.182.171:9092");props.put("acks", "-1");props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i=0;i<10;i++) {producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));}producer.close();}}
  1. 消费者消费消息
package org.lxx.stream.kafka.producer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {System.out.println("Consumer consume...");Properties props = new Properties();props.put("bootstrap.servers", "192.168.182.171:9092");props.put("group.id", "device-consumer-topic");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);List<String> topics = new ArrayList<>();topics.add("test");consumer.subscribe(topics);while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}}}}

ConsumerRecord(topic = test, partition = 0, offset = 27, CreateTime = 1724477538955, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = 0)
ConsumerRecord(topic = test, partition = 0, offset = 28, CreateTime = 1724477539012, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 1)
ConsumerRecord(topic = test, partition = 0, offset = 29, CreateTime = 1724477539012, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = 2)
ConsumerRecord(topic = test, partition = 0, offset = 30, CreateTime = 1724477539013, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3, value = 3)
ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1724477539013, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = 4)
ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = 5)
ConsumerRecord(topic = test, partition = 0, offset = 33, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = 6)
ConsumerRecord(topic = test, partition = 0, offset = 34, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 7, value = 7)
ConsumerRecord(topic = test, partition = 0, offset = 35, CreateTime = 1724477539019, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 8, value = 8)
ConsumerRecord(topic = test, partition = 0, offset = 36, CreateTime = 1724477539019, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 9, value = 9)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词