Salmon的全栈知识 Salmon的全栈知识
首页
  • JavaSE
  • JavaWeb
  • Spring生态
  • JUC
  • JVM
  • Netty
  • Java各版本特性
  • 23种设计模式
  • Maven
  • Java常用框架
  • Dubbo
  • OpenFeign
  • Nacos
  • Zookeeper
  • Sentinel
  • Seata
  • Gateway
  • Go基础
  • Gin
  • SQL数据库

    • MySQL
    • Oracle
  • NoSQL数据库

    • Redis
    • MongoDB
    • ElasticSearch
  • 消息中间件

    • RabbitMQ
    • RocketMQ
    • Kafka
    • ActiveMQ
    • MQTT
    • NATS
  • 网关中间件

    • Nginx
  • Linux
  • Docker
  • Git
  • K8s
  • Solidity
  • Java
  • 计算机网络
  • 操作系统
GitHub (opens new window)
首页
  • JavaSE
  • JavaWeb
  • Spring生态
  • JUC
  • JVM
  • Netty
  • Java各版本特性
  • 23种设计模式
  • Maven
  • Java常用框架
  • Dubbo
  • OpenFeign
  • Nacos
  • Zookeeper
  • Sentinel
  • Seata
  • Gateway
  • Go基础
  • Gin
  • SQL数据库

    • MySQL
    • Oracle
  • NoSQL数据库

    • Redis
    • MongoDB
    • ElasticSearch
  • 消息中间件

    • RabbitMQ
    • RocketMQ
    • Kafka
    • ActiveMQ
    • MQTT
    • NATS
  • 网关中间件

    • Nginx
  • Linux
  • Docker
  • Git
  • K8s
  • Solidity
  • Java
  • 计算机网络
  • 操作系统
GitHub (opens new window)
npm

(进入注册为作者充电)

  • Kafka概述
  • Kafka 快速入门
  • Kafka 生产者
    • 1. 生产者消息发送流程
      • 1.1 发送原理
      • 1.2 生产者重要参数列表
    • 2. 异步发送 API
      • 2.1 普通异步发送
      • 2.2 带回调函数的异步发送
    • 3. 同步发送 API
    • 4. 生产者分区
      • 4.1 分区好处
      • 4.2 生产者发送消息的分区策略
  • Kafka Broker
  • Kafka 消费者
  • Kafka-Eagle 监控
  • Kafka-Kraft 模式
  • 《Kafka》笔记
Salmon
2024-08-05
目录

Kafka 生产者

# 1. 生产者消息发送流程

# 1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

image-20240806095740892

# 1.2 生产者重要参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地址清单。例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 指定发送消息的 key 的序列化类型。一定要写全类名。
value.serializer 指定发送消息的 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32MB。
batch.size 缓冲区一批数据最大值,默认 16KB。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位毫秒,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

# 2. 异步发送 API

# 2.1 普通异步发送

1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

异步发送流程:

image-20240806100643082

2)代码编写

  1. 创建工程 kafka

  2. 导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
    
  3. 创建包名:com.atguigu.kafka.producer

  4. 编写不带回调函数的 API 代码

    package com.atguigu.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class CustomProducer {
        public static void main(String[] args) throws
                InterruptedException {
            // 1. 创建 kafka 生产者的配置对象
            Properties properties = new Properties();
            // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    
            // key,value 序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
            // 3. 创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
            }
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
    

测试:

①在 hadoop102 上开启 Kafka 消费者。

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

# 2.2 带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

带回调函数的异步发送流程:

image-20240806101321540

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            // 添加回调
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                    } else {
                        // 出现异常打印
                        exception.printStackTrace();
                    }
                }
            });
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:

①在 hadoop102 上开启 Kafka 消费者。

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

# 3. 同步发送 API

同步发送流程:

image-20240806102057239

只需在异步发送的基础上,再调用一下 get()方法即可。

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092 ");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 异步发送 默认
//            kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i));
            // 同步发送
            kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:

①在 hadoop102 上开启 Kafka 消费者。

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

# 4. 生产者分区

# 4.1 分区好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

image-20240806102641667

# 4.2 生产者发送消息的分区策略

1)默认的分区器 DefaultPartitioner

在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

image-20240806102802262

上次更新: 2024/09/26, 12:02:31
Kafka 快速入门
Kafka Broker

← Kafka 快速入门 Kafka Broker→

Theme by Vdoing | Copyright © 2022-2025 Salmon's Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式