Kafka的命令使用和JavaAPI的操作和核心机制详解
标签: Kafka的命令使用和JavaAPI的操作和核心机制详解 HarmonyOS博客 51CTO博客
2023-04-09 18:23:51 111浏览
kafka笔记
内容:
- 1- Kafka的核心机制
- 分片和副本机制
- 如何保证数据不丢失
- 数据存储和查询机制
- 数据分发机制
- 负载均衡机制
- 2- Kafka的图形管理界面的安装与基本使用 (了解)
- 3- kafka的数据积压的问题(关键点: 如何检测是否存在数据积压)
- 4- kafka的配额限速问题(知道其作用. 记录好使用方式)
1. 消息队列的基本介绍
1.1 消息队列产生的背景
什么是消息队列呢?
消息: 数据 只不过这个数据具有流动的状态
队列: 存储数据的容器, 只不过这个容器具有FIFO(先进先出)特性
消息队列: 数据在队列中, 从队列的一端传递到另一端的过程, 数据在整个队列中产生了一种流动状态
1.2 常见的消息队列的产品
常见消息队列的产品:
- 1- ActiveMQ: 出现时间比较早的一款消息队列的中间件组件 , 目前整个社区活跃度比较低, 此软件的使用人群也在不断的降低
- 2- RabbitMQ: 目前在Java领域中使用非常频繁的一款消息队列的中间件产品, 其社区活跃度比较高的, 支持多种语言的开发
- 3- RocketMQ: 是由阿里推出一款消息队列的中间件产品, 目前主要是在阿里系范围内被广泛时间,客户端支持的语言主要是Java
- 4- Kafka: 是一款大数据领域下的消息队列的中间件产品, 主要应用在大数据领域方向上, 在后端 以及业务领域使用较少(用基本都是非重要数据传输上使用)
- 5- Pulsar: 最近一两年新起的一款消息队列的中间件产品, 也是Apache顶级开源项目, 目前主要是有StreamNative公司进行维护
1.3 消息队列的作用
- 1- 同步操作转换为异步操作
- 2- 应用的解耦合
- 3- 流量削峰: 在一些秒杀场景中, 遇到突然的庞大的并发量, 但是一般就是一瞬间的事情, 过后就没有了
- 4- 消息驱动系统
1.4 消息队列的两种消费模式
在Java中, 提供了消息队列的协议: JMS(Java Message Server) Java消息服务, sun公司系统所有的消息队列支持Java客户端的时候, 都按照JMS协议规范制定客户端(类似于提供JDBC协议)
- 点对点: 数据被生产到容器后, 最终这个数据只能被一个消费者来消费数据
- 发布订阅: 数据被生产到容器后,可以被多个消费者所消费
两个角色: 生产者 (producer) 和 消费者(consumer)
2. kafka的基本介绍
kafka是Apache旗下的一款开源免费的消息队列的中间件产品 最早是有领英公司开发的, 后期贡献给Apache, 目前也是Apache旗下的顶级开源项目. 采用的语言为scala, Kafka2依然要依赖于zookeeper的
官方网站: https://www.kafka.apache.org
适用场景:
数据传递工作, 需要将数据从一端传递到另一端, 此时可以使用Kafka实现, 不局限于两端的程序
在实时领域中, 主要是用于流式的数据处理工作
3. kafka的集群架构
相关名词:
Kafka cluster: Kafka的集群
broker: Kafka的节点
producer: 生产者
consumer: 消费者
Topic: 话题 主题 理解为就是一个逻辑容器
shard: 分片 一个topic可以被分为N个分片, 分片的数量和节点数据没有关系
replicas: 副本 可以对每一个分片构建N个副本, 副本的数量最多和节点的数据一致(包含本身)
zookeeper: 管理kafka集群, 并存储Kafka的相关元数据
4. kafka的安装操作
参考Kafka的集群安装文档 完成整个安装工作即可
如果安装后, 无法启动, 可能遇到的问题:
1) 配置文件中忘记修改broker id
2) 忘记修改监听的地址, 或者修改了但是前面的注释没有打开
如何启动Kafka集群:
启动zookeeper集群: 每个节点都要执行
cd /export/server/zookeeper/bin
./zkServer.sh start
启动完成后 需要通过 jps -m 查看是否启动 , 并且还需要通过:
./zkServer.sh status 查看状态, 必须看到一个leader 两个follower才认为启动成功了
启动Kafka集群:
单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
前台启动:
./kafka-server-start.sh ../config/server.properties
后台启动:
nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl +C 退出, 然后挂载到后台
如何停止:
单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
操作:
jps 然后通过 kill -9
或者:
./kafka-server-stop.sh
5. kafka的相关使用
kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作
所以说学习kafka主要就是学习如何使用kafka生产数据, 以及如何从kafka中读取数据
5.1 kafka的shell命令使用
- 1- 如何创建Topic: kafka-topics.sh
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
必备选项:
1- 执行什么操作 --create 表示为创建
2- 连接地址: 建议是zookeeper地址 --zookeeper node1:2181,node2:2181,node3:2181
3- 创建的topic的名字: --topic test01
4- topic有多少个分片: --partitions 3
5- 每个分片有多少个副本: --replication-factor 2
- 2- 查看当前有那些topic:
./kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
- 3- 查看某一个topic的详细信息
查看每一个topic的详细信息:
./kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181
查看某一个topic的详细信息:
./kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
- 4- 如何修改Topic:
Topic 仅允许增大分片, 不允许减少分片, 同时也不支持修改副本数量
增大分区:
./kafka-topics.sh --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 5
- 5- 如何删除Topic
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
注意:
默认情况下, 删除一个topic 仅仅是标记删除, 主要原因: kafka担心误删数据, 一般需要用户手动删除
如果想执行删除的时候, 直接将topic完整的删除掉: 此时需要在server.properties 配置中修改一个配置为true
delete.topic.enable=true
如果topic中的数据量非常少, 或者说没有任何的数据的时候, 此时topic会自动先执行逻辑删除, 然后在物理删除, 不管是否配置了delete.topic.enable=true
- 6- 如何模拟生产者:
./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test01
- 7- 如何模拟消费者:
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01
默认从当前时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning 参数即可
5.2 kafka的基准测试
kafka的基准测试:
主要指的安装完成Kafka集群后, 进行测试操作, 测试其是否承载多大的并发量(读写效率)
注意: 在进行Kafka的基准测试的时候, 受Topic的分片和副本的数量影响会比较大, 一般在测试的时候, 会构建多个topic, 每一个topic设置不同的分片和副本的数量, 比如: 一个设置分片多一些, 副本少一些, 一个设置分片少一些, 副本多一些, 要不设置分片多副本也多
- 1- 创建一个Topic
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
- 2- 测试写入的数据的效率
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
属性说明:
--num-records: 发送的总消息量
--throughput: 指定吞吐量(限流) -1 不限制
--record-size: 每条数据的大小(字节)
--producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 设置生产者的配置信息(连接地址, 消息确认方案)
写后的结果:
5000000 records sent, 134578.634296 records/sec (128.34 MB/sec), 239.83 ms avg latency, 1524.00 ms max latency, 45 ms 50th, 940 ms 95th, 1269 ms 99th, 1461 ms 99.9th.
需关注的信息:
5000000 records sent : 总计写入了多少条数据
134578.634296 records/sec: 每秒中可以处理多少条数据
128.34 MB/sec: 每秒钟可以处理的数据量是多大
- 3- 测试读取数据的效率
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000
属性:
--fetch-size 1048576 : 每次从kafka端拉取的数据量
--messages: 测试的总消息量
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-30 21:08:26:464, 2023-03-30 21:08:42:373, 4768.3716, 299.7279, 5000000, 314287.5102, 1680181706706, -1680181690797, -0.0000, -0.0030
start.time: 2023-03-30 21:08:26:464 启动时间
end.time: 2023-03-30 21:08:42:373 结束时间
data.consumed.in.MB: 4768.3716 总大小
MB.sec: 299.7279 每秒中可以处理的大小
data.consumed.in.nMsg: 5000000 总消息量
nMsg.sec: 314287.5102 每秒钟可以处理的数据
总结:
假设Kafka的节点数量是无限多的:
topic的分片数量越多, 理论上读写效率越高
topic的副本数量越多, 理论上写入的效率变差
一般可以将分片的数量设置为节点数量的三倍左右, 副本数量为1, 基本上可以测试出最佳性能
5.3 kafka的Java API的操作
- 1- 创建一个Maven的项目, 导入相关的依赖
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
- 2- 创建两个包目录: com.itheima.kafka.producer 和 com.itheima.kafka.consumer
5.3.1 演示如何将数据生产到Kafka
代码实现:
package com.itheima.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类
Producer<String, String> producer = new KafkaProducer<>(props);
// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));
producer.send(producerRecord);
}
// 第三步: 关闭生产者对象
producer.close();
}
}
5.3.2 演示如何从Kafka中消费数据
代码实现:
package com.itheima.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// 演示消费端的代码实现
public class KafkaConsumerTest {
public static void main(String[] args) {
// 第一步: 创建消费者的核心对象: KafkaConsumer 并添加配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 指定kafka的集群地址
props.setProperty("group.id", "test"); // 指定消费者组
props.setProperty("enable.auto.commit", "true"); // 是否开启自动提交消息偏移量
props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交消息偏移量的间隔时间
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类型
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value反序列化类型
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 第二步: 指定监听那些topic中数据 (支持多个一起监听)
consumer.subscribe(Arrays.asList("test01"));
while (true) { // 不断监听, 持续一直监听
// 第三步: 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候,等待的超时时间, 如果过了这个等待超时时间, 返回 空对象(对象是存在的, 但是内部没有任何的数据, 相当于空容器)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset(); // 获取消息偏移量
String key = record.key(); // 获取消息的key值
String value = record.value(); // 获取消息的value值
int partition = record.partition(); // 获取消息从那个分片来
System.out.println("消息的偏移量为:"+ offset +"; 消息所属分片:"+partition +";消息的key值:"+key+";消息的value:"+value);
}
}
}
}
可能出现,控制台 不打印 也不报错(底层是保存, 缺少log4j.properties日志配置):
发现本地windows的hosts文件配置有问题: C:\Windows\System32\drivers\etc\hosts 文件
必须配置以下内容:
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn
报这个问题同学, 大概率是只配置 node1 node2和node3 并没有配置全称
6. kafka的核心机制
6.1 Topic的分片和副本机制
什么是分片呢?
分片: 逻辑概念
相当于将一个Topic(大容器)拆分为N多个小容器, 多个小的容器构建为一个Topic
目的:
1- 提高读写的效率: 分片可以分布在不同节点上, 在进行读写的时候, 可以让多个节点一起参与(提高并行度)
2- 分布式存储: 解决了单台节点存储容量有限的问题
分片的数量:分片是可以创建N多个, 理论上没有任何的限制
什么是副本呢?
副本: 物理的概念
针对每个分片的数据, 可以设置备份, 可以将其备份多个
目的:
提高数据的可靠性, 防止数据丢失
副本的数量: 副本的数量最多和集群节点数量保持一致, 但是一般设置为 2个 或者 3个
6.2 kafka如何保证数据不丢失
6.2.1 生产端是如何保证数据不丢失
当生产者将数据生产到Broker后, Broker应该给予一个ack确认响应,在Kafka中, 主要提供了三种ack的方案:
0: 生产者只管发送数据, 不关心不接收broker给予的响应
1: 生产者将数据发送到Broker端, 需要等待Broker端对应的topic上的对应的分片的主副本接收到消息后, 才认为发送成功了
-1(ALL): 生产者将数据发送到Broker端, 需要等待Broker端对应的topic上的对应的分片的所有的副本接收到消息后, 才认为发送成功了
效率角度: 0 > 1 > -1
安全角度: -1 > 1 > 0
思考: 在实际使用中, 一般使用什么方案呢? 三种都有可能
一般要根据消息的重要程度, 来选择采用什么方案, 如果数据非常的重要, 不能丢失, 一般设置为 -1
相关的思考的点:
思考1: 如果Broker迟迟没有给予ACK响应, 如何解决呢?
解决方案: 设置超时时间, 如果超时触发重试策略, 如果多次重试依然无法给予响应, 此时程序报异常
思考2: 每发送一次,Broker就要给予一次响应, 请问这样是否会对网络带宽产生影响呢? 如果产生, 如何解决呢?
解决方案: 会, 引入缓存池, 满足了一批数据后, 异步发送给Broker端, Broker端只需要针对这一批数据给予一次响应即可
思考3:通过一批一批的异步发送方式, 如果Broker端对这一批数据没有给予响应, 但是缓存池中数据已经满了, 如何解决?
解决方案: 选择清空缓存池 / 不清空, 如果数据是可重复读的,那么直接让程序报错即可, 通知处理, 处理后, 重新获取发送即可, 如果数据是不可重复读,为了避免此种问题, 我们可以数据先在某个其他位置保存(备份), 当数据生产成功, 删除对应的数据, 生产不成功, 后续直接从保存的位置中获取生产即可
相关的参数:
buffer.memory : 缓存池的大小
默认值: 33554432(32M)
retries: 重试次数
默认值: 2147483647 (此参数并不代表最终的重试次数, 取决于超时相关参数)
delivery.timeout.ms: 一次发送数据总超时时间
默认值: 120000(120s)
request.timeout.ms: 一次请求超时时间
默认值: 30000(30s)
一批数据的阈值: 时间 和 大小
batch.size : 一批数据大小
默认值: 16384 (16kb)
linger.ms : 每一批次的间隔时间
默认值: 0
代码演示: 如何模拟同步发送数据 和 异步发送数据
同步方式:
package com.itheima.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
// 模拟同步的发送方式
public class KafkaProducerTestSync {
public static void main(String[] args) {
// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类
Producer<String, String> producer = new KafkaProducer<>(props);
// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));
try {
producer.send(producerRecord).get(); // get() 会等待响应
// 发送成功了...
}catch (Exception e) {
// 发送失败了: 此处失败指的是重试后的失败 一旦发送失败, 就会抛出异常
// 在此处, 就可以编写发送失败后的, 业务逻辑代码
e.printStackTrace();
}
}
// 第三步: 关闭生产者对象
producer.close();
}
}
异步方式:
package com.itheima.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
// 模拟异步有返回值的发送方式
public class KafkaProducerTestAsync {
public static void main(String[] args) {
// 第一步: 创建Kafka的生产者核心类对象: KafkaProducer 并传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 连接地址
props.put("acks", "all"); // 消息确认方案 all 是最高级别
props.put("linger.ms",1000); // 设置每批数据的间隔时间
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key的数据类型 及其序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的数据类型 及其序列化类
Producer<String, String> producer = new KafkaProducer<>(props);
// 第二步: 执行发送数据操作
for (int i = 0; i < 10; i++) {
// 生产者的核心数据承载对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",Integer.toString(i));
producer.send(producerRecord, new Callback() { // 异步有返回值发送方式
// 回调函数: 底层在异步发送数据的时候, 发送一次, 就会调用一次回调函数, 如果 Exception 不为Null 说明发送成功了, 否则认为发送失败了
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
// 认为数据发送失败了
// 在此处编写处理失败的业务逻辑
}
// 发送成功了
}
});
}
// 第三步: 关闭生产者对象
producer.close();
}
}
6.2.2 Broker端如何保证数据不丢失
保证方案: 磁盘存储 + 多副本 + ack为-1
6.2.3 消费端如何保证数据不丢失
第一步: 当Consumer启动后, 连接Kafka集群, 根据group.id 到Kafka中寻找上一次消费到了什么位置(偏移量)
第二步:
如果consumer找到了上次消费位置, 接着从这个位置开始消费数据
如果没有找到上一次消费的位置, 说明第一次来, 这个时候默认从当前时刻开始消费数据, 消费的位置也会从当前这个消息的偏移量位置开始消费
第三步: 消费者开始消费数据, 在消费的过程中, 每消费完数据后, 都要和kafka集群进行汇报, 汇报当前消费到了那一个偏移量信息
汇报方式: 自动 / 手动
**思考: 请问在这种方式下是否可以保证消费端不会发送数据丢失的问题呢? **
可以保证, 但是可能会存在重复消费的问题
思考: 消费者消费的消息偏移量信息是存储在哪里呢?
在 0.8.x版本之前, 消费者的消息偏移量信息是被记录在zookeeper中
在 0.8.x版本之后, 将消费者的消息偏移量信息记录在kafka集群上, 通过一个topic来记录: __consumer_offsets
此topic默认有50个分片 1个副本
演示: 如何手动提交偏移量信息
package com.itheima.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// 演示消费端的代码实现
public class KafkaConsumerTest2 {
public static void main(String[] args) {
// 第一步: 创建消费者的核心对象: KafkaConsumer 并添加配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 指定kafka的集群地址
props.setProperty("group.id", "test"); // 指定消费者组
props.setProperty("enable.auto.commit", "false"); // 是否开启自动提交消息偏移量
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类型
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value反序列化类型
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 第二步: 指定监听那些topic中数据 (支持多个一起监听)
consumer.subscribe(Arrays.asList("test01"));
while (true) { // 不断监听, 持续一直监听
// 第三步: 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候,等待的超时时间, 如果过了这个等待超时时间, 返回 空对象(对象是存在的, 但是内部没有任何的数据, 相当于空容器)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset(); // 获取消息偏移量
String key = record.key(); // 获取消息的key值
String value = record.value(); // 获取消息的value值
int partition = record.partition(); // 获取消息从那个分片来
System.out.println("消息的偏移量为:"+ offset +"; 消息所属分片:"+partition +";消息的key值:"+key+";消息的value:"+value);
}
// 每消费完一批数据, 提交一次偏移量信息
// 注意: 一旦使用手动提交偏移量, 千万要注意, 必须写提交偏移量的代码. 否则会导致大量的数据重复消费
//consumer.commitSync(); // 同步提交
consumer.commitAsync(); // 异步提交
}
}
}
6.3 kafka的消息存储和查询机制
6.3.1 kafka的消息存储
存儲路径: /export/server/kafka/data
在此目录下,根据topic名称和分片编号创建一个目录,在此目录下存储对应分片的数据
1- Kafka中数据是存储在磁盘中, 通过分文件的方式来存储的, 一个log文件默认最大为1GB,当达到1GB后, 就会滚动形成一个新的log文件, 同时对应的index文件也会滚动形成一个新的文件
2- 每一个消息数据片段都是由两个文件组成的:
index文件: 对log文件中数据索引信息
log文件: 存储是真正消息数据
3- 文件名表示什么?
当前这个文件存储的消息起始偏移量
思考: kafka为啥要分文件的方式来存储数据呢? 如果吧数据放置到同一个文件中, 不更好嘛?
原因: kafka本质是消息队列的中间件产品, 当消息被消费者所消费后, 存储在kafka中数据其实也就没啥用了, 或者说随着时间的推移, 存储在kafka的消息数据, 其数据价值会越来越低的, 所以说kafka存储数据仅仅是一种临时存储
默认情况下, kafka会自动删除超过168小时(7天)的数据, 通过分文件的方式, kafka只需要查看文件最后修改时间, 如果超过7天, 自动将其删除即可
相关配置: server.properties
log.retention.hours=168 (小时)
log.segment.bytes=1073741824(1GB)
6.3.2 kafka的查询机制
查询数据的步骤:
- 1- 确定消息被存储在那个segment片段中
- 2- 先去对应segment片段中index文件, 从这个索引文件中, 查询对应消息偏移量, 在log文件的什么位置上进行存储着
- 3- 根据返回的log文件的具体的位置信息, 底层会基于磁盘顺序查询方式查询log文件, 找到对应位置上数据即可
扩展:
磁盘的读写方式主要有二种: 顺序读写 和 随机读写
顺序读写的效率是远远高于随机读写的效率
6.4 kafka的生产者数据分发机制
生产者将数据生产到kafka的某个Topic中, Topic可以被分为多个分片的,最终一条消息只能被其中一个分片所接收, 那么最终是有哪个分片来接收数据呢? 这就是生产者的分发机制
思考: 分发策略有那些呢?
1- 随机分发策略
2- 轮询分发策略
3- Hash取模分发策略
4- 指定分区策略
5- 范围分发策略
6- 自定义分区策略
....
思考: 在Kafka中支持有那些策略呢?
1- 轮询策略(2.4版本以下), 目前为 粘性分发策略 是Java客户端拥有的
2- Hash取模分发策略
3- 指定分发策略
4- 随机分发策略 (Python 客户端支持, Java 客户端不支持)
5- 自定义分区策略
如何使用不同的分发策略呢?
- 1- 指定分区策略
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
在生产端, 构建数据承载对象的时候, 采用此构造方式, 即可实现指定分区的策略
分区编号: 从 0 开始
- 2- Hash 取模分发策略
2.1 创建数据承载对象的时候, 必须使用仅传递 k 和 v的构造方法, 即可使用hash模式
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
2.2 当执行Hash取模分区策略,底层是通过一个默认的分区类实现完成Hash取模: DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
// 执行分区的核心方法, 返回内容表示将当前这条数据发送到那个分片上
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
说明: 在使用此种分发策略的时候, key值一定是可变的, 千万不要固定不变
- 3- 粘性分区策略
3.1 创建生产者的数据承载对象的时候, 只需要传递value即可, 此时底层会采用粘性的分区策略
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
3.2 当执行粘性分区策略,底层是通过一个默认的分区类实现完成Hash取模: DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
// 执行分区的核心方法, 返回内容表示将当前这条数据发送到那个分片上
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 当key为null的时候, 执行的是粘性的分区策略
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
什么叫粘性分区策略:
当生产者发送数据的时候, 一般都是采用异步(批)发送方案,当发送一批数据到Broker端后, 首先会随机的选择其中一个分片, 然后尽可能黏上这个分区, 将这一批的数据全部都交给这一个分区即可
什么是轮询策略:
当生产者发送数据的时候, 一般都是采用异步(批)发送方案,当发送一批数据到Broker端后, 根据topic的分片的数量, 将一批数据拆分为N多个小的批次, 一个批次对应一个分片, 然后写入到topic的各个分片上
粘性分区的优势:
减少中间的这个切分的操作, 直接将一批数据全部写入到某一个分片上, 同时也减少了中间ack的响应的次数, 减少网络的带宽, 提升效率
但是如果生成的数量非常的块, 会导致大量的数据写入到同一个分片上, 无法解开
- 4- 自定义分区策略: 在MR中自定义分区方案很相似的
如何自定义分区呢? 抄 抄DefaultPartitioner
1- 创建一个类, 实现 Partitioner 接口
2- 重写接口中partition()方法 以及 close 方法, 主要核心重写: partition()
partition方法的参数列表:
String topic : 指定要写入到那个topic
Object key : 传入的key值
byte[] keyBytes: 传入的key的字节
Object value : 传入的value数据
byte[] valueBytes : 传入的value的字节
Cluster cluster : 集群的对象 可以帮助获取指定的topic有多少个分片
其返回值为 要将这个数据写入到那个分片的编号
3- 将自定义的分区类, 配置到生产者的代码的Properties配置信息中:
key: partitioner.class
value: 自定义类的权限类名
将key 和value的值添加到properties对象中
6.5 kafka的消费者的负载均衡机制
Kafka的消费者负载均衡机制规定:
1- 在一个消费者组内, 消费者的数量最多和所监听的topic的分片数量是相等的, 如果有大于分片数量的消费者, 一定会有消费者处于闲置的状态
2- 在一个消费者组内, topic的一个分片的数据只能被一个消费者所接收, 不允许出现一个分片被多个消费者所接收的情况, 而一个消费者是可以接收多个分片的数据
思考:
如何模拟点对点消费模式: 让所有监听这个topic的消费者, 都处在同一个消费组内
如何模拟发布订阅模式: 让所有监听这个topic的消费者都不在同一个消费组内
7. 通过命令的方式查看数据积压的问题
./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --group test01 --describe
工作中, 有时候运维工程师, 会将lag指标纳入监控范围, 当这个LAG 出现积压问题, 基于告警系统 进行告警
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论