Hefery 的个人网站

Hefery's Personal Website

Contact:hefery@126.com
  menu
73 文章
0 浏览
3 当前访客
ღゝ◡╹)ノ❤️

Kafka基础—必知必会

Kafka简介

Kafka简介

Kafka:

  • 消息引擎系统(Messaging System),基于 Zookeeper 分布式消息队列(发布/订阅模式)
  • 分布式事件流平台(Distributed Streaming Platform),用于高性能数据管道、流分析、数据集成和关键任务应用

消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。

消息引擎是用于在不同系统之间传输消息,那么如何设计待传输消息的格式—Kafka使用纯二进制的字节序列。当然消息还是结构化的,只是在使用之前都要将其转换成二进制的字节序列

特点:实时、高吞吐率、高性能、高可靠

Kafka作为消息队列的作用:

  • 削峰填谷:缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。引入像 Kafka 这样的消息引擎系统来对抗这种上下游系统 TPS 的错配以及瞬时峰值流量。
  • 解耦:允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
  • 异步通信:把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们

Kafka工作模型

  • 点对点模型:
    • 系统 A 发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息
  • 发布/订阅模型:
    • 可能存在多个发布者向相同的主题 topic 发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息
    • 消费者消费数据之后,不删除数据

Kafka基本概念

消息:

  • Record:消息,Kafka 处理的主要对象
  • Topic:主题,由 Partition 组成,承载 Record 的逻辑容器,在实际使用中多用来区分具体的业务。
    • 发布订阅的对象是 Topic,可以为每个业务创建专属的主题
    • 每个主题可以配置 M 个分区Partition,而每个分区 Partition 又可以配置 N 个副本Replica
  • Partition:分区,实际消息存储单位。将每个 Topic 划分成多个分区。每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中
    • 分区中的记录每个都分配了一个称为偏移的顺序ID号,它唯一地标识分区中的每个记录
    • 分区作用:提供负载均衡的能力,或说对数据进行分区主要原因,就是为了实现系统的高伸缩性
    • 分区策略:轮询、随机、按消息键key保存……
  • Offset:消息位移,分区中每条 Record 位置信息,分区位移总是从 0 开始

客户端:

  • Producer:消息生产者,通常持续不断地向一个或多个 Topic 发送消息的客户端
  • Consumer:消息消费者,订阅一个或多个 Topic 消息并进行消费的客户端

生产者向分区写入消息,每条消息在分区中的位置信息由一个叫偏移量(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9

服务端:

  • Broker :一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。
  • Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kkafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本

Broker 如何持久化数据:Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的

Kafka操作

Kafka安装部署

Kafka版本:kafka_2.12-3.1.0.tgz(编译 Kafka 源代码的 Scala 编译器版本 + Kafka 版本号)

选择合适的 Kafka 版本:每个 Kafka 版本都有恰当使用场景和独特优缺点,切记不要一味追求最新版本

  1. Zookeeper

    • 解压:tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/install/
    • 配置:vim config/zookeeper.properties
    • 启动:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    • 验证:ps -ef | grep zookeeper
  2. Kafuka

    • 解压:tar -zxvf kafka_2.11-2.4.0.tgz -C /opt/install/
    • 配置:vim config/server.properties
    • 启动:nohup bin/kafka-server-start.sh config/server.properties &
    • 验证:ps -ef | grep kafka
    • 停止:bin/kafka-server-stop.sh

启动时:先启 Zookeeper,再启 Kafka

关闭时:先关 Kafka,再关 Zookeeper

Kafka配置文件

# 每个 broker 在集群中的唯一标识,要求是正数
broker.id=0

# Kafka数据存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/tmp/kafka-logs

# Zookeeper集群的地址,多个地址用逗号分割 hostname1:port1, hostname2:port2, hostname3:port3
zookeeper.connect=localhost:2181

# kafka真正bind的地址:定义 Kafka Broker 的 Listener 配置项
listeners

# 将 Broker 的 Listener 信息发布到 Zookeeper 中,暴露给外部 listeners,如果没有设置,会用 listeners
advertised.listeners

Kafka核心API

Kafka客户端 AP

  • AdminClient:Kafka 管理和检测 Topic、broker 以及其它 Kafka 对象
  • Producer:发布消息到1个或多个topic
  • Consumer:订阅一个或多个topic,并处理产生的消息
  • Streams:高效地将输入流转换到输出流
  • Connector:从一些源系统或应用程序中拉取数据到kafka

AdminClient API

API作用
AdminClientAdminClient客户端对象
NewTopic创建Topic
CreateTopicsResult创建Topic的返回结果
ListTopicsResult查询Topic列表
ListTopicsOptions查询Topic列表及选项
DescribeTopicsResult查询Topics详情
DescribeConfigsResult查询Topics配置项
public class AdminSample {

    public final static String TOPIC_NAME = "lalala-topic";

    public static void main(String[] args) throws Exception {
        // 设置AdminClient
        AdminClient adminClient = AdminSample.adminClient();
        //System.out.println("AdminClient:" + adminClient);
        // 创建Topic实例
        //createTopic();
        // 获取Topic列表
        //topicList();
        // 删除Topic实例
        //delTopics();
        // 描述Topic
        //describeTopics();
        // 查询Config
        //describeConfig();
        // 修改Config
        //alterConfig();
        // 增加partition数量
        incrPartitions(2);
    }

    /*
        增加partition数量
     */
    public static void incrPartitions(int partitions) throws Exception{
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);

        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
    }

    /*
        修改Config信息
     */
    public static void alterConfig() throws Exception{
        AdminClient adminClient = adminClient();
        // Map<ConfigResource,Config> configMaps = new HashMap<>();

        // 组织两个参数
        // ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        // Config config = new Config(Arrays.asList(new ConfigEntry("preallocate","true")));
        // configMaps.put(configResource,config);
        // AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMaps);

        /*
            从 2.3以上的版本新修改的API
         */
        Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
        // 组织两个参数
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        AlterConfigOp alterConfigOp =
                new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
        configMaps.put(configResource,Arrays.asList(alterConfigOp));

        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }

    /*
        查看配置信息
        ConfigResource(type=TOPIC, name='jiangzh-topic') ,
        Config(
            entries=[
             ConfigEntry(
               name=compression.type,
               value=producer,
               source=DEFAULT_CONFIG,
               isSensitive=false,
               isReadOnly=false,
               synonyms=[]),
             ConfigEntry(
                name=leader.replication.throttled.replicas,
                value=,
                source=DEFAULT_CONFIG,
                isSensitive=false,
                isReadOnly=false,
                synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.4-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
             ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
     */
    public static void describeConfig() throws Exception{
        AdminClient adminClient = adminClient();
        // TODO 这里做一个预留,集群时会讲到
//        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, TOPIC_NAME);

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
        });
    }

    /*
        描述Topic
        name :jiangzh-topic ,
        desc: (name=jiangzh-topic,
            internal=false,
            partitions=
                (partition=0,
                 leader=192.168.220.128:9092
                 (id: 0 rack: null),
                 replicas=192.168.220.128:9092
                 (id: 0 rack: null),
                 isr=192.168.220.128:9092
                 (id: 0 rack: null)),
                 authorizedOperations=[])
     */
    public static void describeTopics() throws Exception {
        AdminClient adminClient = adminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)->{
            System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
        });
    }

    /**
     * 删除Topic
     * @throws Exception
     */
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }

    /**
     * 获取Topic列表
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void topicList() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        // 是否查看internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        //ListTopicsResult listTopicsResult = adminClient.listTopics();
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        Set<String> names = listTopicsResult.names().get();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
        // 打印names
        names.stream().forEach(System.out::println);
        // 打印topicListings
        topicListings.stream().forEach((topicList)->{
            System.out.println(topicList);
        });
    }

    /**
     * 创建Topic实例
     */
    public static void createTopic() {
        AdminClient adminClient = adminClient();
        // bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hefery-topic
        Short rs = 1;  // 副本因子
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, rs);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("CreateTopicsResult:" + topics);
    }

    /**
     * 设置AdminClient
     * @return
     */
    public static AdminClient adminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.128:9092");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

}

Producer APl

Kafka配置解析

Producer发送模式

  • 同步发送
    public class ProducerSample {
    
        private final static String TOPIC_NAME="hefery-topic";
    
        public static void main(String[] args) {
            // 同步:Producer异步阻塞发送
            producerSyncSend();
        }
    
        /*
            同步:Producer异步阻塞发送
         */
        public static void producerSyncSend() throws ExecutionException, InterruptedException {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.128:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            Producer<String,String> producer = new KafkaProducer<>(properties);
    
            // 消息对象 - ProducerRecoder
            for(int i=0;i<10;i++){
                String key = "key-"+i;
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(TOPIC_NAME,key,"value-"+i);
    
                Future<RecordMetadata> send = producer.send(record);
                RecordMetadata recordMetadata = send.get();
                System.out.println(key + "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
            }
    
            // 所有的通道打开都需要关闭
            producer.close();
        }
    
    }
    
  • 异步发送
    public class ProducerSample {
    
        private final static String TOPIC_NAME="hefery-topic";
    
        public static void main(String[] args) {
            // Producer异步发送演示
            producerSend();
        }
    
        /**
         * Producer异步发送演示
         */
        public static void producerSend() {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.128:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            // Producer的主对象
            Producer<String,String> producer = new KafkaProducer<>(properties);
    
            // 消息对象 - ProducerRecoder
            for(int i=0;i<10;i++){
                ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
                producer.send(record);
            }
    
            // 所有的通道打开都需要关闭
            producer.close();
        }
    
    }
    
  • 异步回调发送
    public class ProducerSample {
    
        private final static String TOPIC_NAME="hefery-topic";
    
        public static void main(String[] args) {
            /// Producer异步发送带回调函数
            producerSendWithCallback();
            // Producer异步发送带回调函数和Partition负载均衡
            producerSendWithCallbackAndPartition();
        }
    
        /*
            Producer异步发送带回调函数和Partition负载均衡
         */
        public static void producerSendWithCallbackAndPartition(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.128:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.imooc.jiangzh.kafka.producer.SamplePartition");
    
            // Producer的主对象
            Producer<String,String> producer = new KafkaProducer<>(properties);
    
            // 消息对象 - ProducerRecoder
            for(int i=0;i<10;i++){
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
    
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println(
                                "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
                    }
                });
            }
    
            // 所有的通道打开都需要关闭
            producer.close();
        }
    
        /*
            Producer异步发送带回调函数
         */
        public static void producerSendWithCallback(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.128:9092");
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.RETRIES_CONFIG,"0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            Producer<String,String> producer = new KafkaProducer<>(properties);
    
            // 消息对象 - ProducerRecoder
            for(int i=0;i<10;i++){
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
    
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println(
                                "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
                    }
                });
            }
    
            // 所有的通道打开都需要关闭
            producer.close();
        }
    
    }
    

Producer源码

构建—KafkaProducer
  • MetricConfig
  • 加载负载均衡器
  • 初始化Serializer
  • 初始化RecordAccumulator——类似于计数器
  • 启动newSender——守护线程
public class KafkaProducer<K, V> implements Producer<K, V> {
   KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));

        try {
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;
            String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
            this.clientId = buildClientId(config.getString("client.id"), transactionalId);
            LogContext logContext;
            if (transactionalId == null) {
                logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
            } else {
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
            }

            this.log = logContext.logger(KafkaProducer.class);
            this.log.trace("Starting the Kafka producer");
            Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
            // 1. MetricConfig  
            MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
            reporters.add(new JmxReporter("kafka.producer"));
            this.metrics = new Metrics(metricConfig, reporters, time);
            // 2. 加载负载均衡器
            this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
            long retryBackoffMs = config.getLong("retry.backoff.ms");
            if (keySerializer == null) {
                this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore("key.serializer");
                this.keySerializer = keySerializer;
            }

            // 3. 初始化Serializer
            if (valueSerializer == null) {
                this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore("value.serializer");
                this.valueSerializer = valueSerializer;
            }

            userProvidedConfigs.put("client.id", this.clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
            if (interceptors != null) {
                this.interceptors = interceptors;
            } else {
                this.interceptors = new ProducerInterceptors(interceptorList);
            }

            ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt("max.request.size");
            this.totalMemorySize = config.getLong("buffer.memory");
            this.compressionType = CompressionType.forName(config.getString("compression.type"));
            this.maxBlockTimeMs = config.getLong("max.block.ms");
            this.transactionManager = configureTransactionState(config, logContext, this.log);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
            this.apiVersions = new ApiVersions();
            // 4. 初始化RecordAccumulator——类似于计数器
            this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
                this.metadata.bootstrap(addresses, time.milliseconds());
            }

            this.errors = this.metrics.sensor("errors");
            // 5. 启动newSender——守护线程
            this.sender = this.newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka producer started");
        } catch (Throwable var23) {
            this.close(Duration.ofMillis(0L), true);
            throw new KafkaException("Failed to construct kafka producer", var23);
        }
    }

}

Producer是线程安全的
Producer并不是接到一条发一条,是批量发送

发送—producer.send(record)
  • 计算分区:消息具体进入哪一个partition
  • 计算批次:accumulator.append
  • 主要内容:创建批次;向批次中追加消息

image.png

public class KafkaProducer<K, V> implements Producer<K, V> {
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        try {
            this.throwIfProducerClosed();

            KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var20) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var20);
                }

                throw var20;
            }

            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            byte[] serializedKey;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var19) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var19);
            }

            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var18) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var18);
            }

            // 计算分区:消息具体进入哪一个partition
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
            }

            Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                this.transactionManager.failIfNotReadyForSend();
            }

            // 计算批次:accumulator.append
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);
            // 主要内容:创建批次
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                this.partitioner.onNewBatch(record.topic(), cluster, partition);
                partition = this.partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                }

                interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
                result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false);
            }

            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                this.transactionManager.maybeAddPartitionToTransaction(tp);
            }

            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                // 主要内容:向批次中追加消息
                this.sender.wakeup();
            }

            return result.future;
        } catch (ApiException var21) {
            this.log.debug("Exception occurred during message send:", var21);
            if (callback != null) {
                callback.onCompletion((RecordMetadata)null, var21);
            }

            this.errors.record();
            this.interceptors.onSendError(record, tp, var21);
            return new KafkaProducer.FutureFailure(var21);
        } catch (InterruptedException var22) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var22);
            throw new InterruptException(var22);
        } catch (BufferExhaustedException var23) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onSendError(record, tp, var23);
            throw var23;
        } catch (KafkaException var24) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var24);
            throw var24;
        } catch (Exception var25) {
            this.interceptors.onSendError(record, tp, var25);
            throw var25;
        }
    }

}

Producer发送原理解析

  • 直接发送
  • 负载均衡
  • 异步发送

Producer自定义Partition负载均衡

public class SamplePartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*
            key-1 key-2 key-3
         */
        String keyStr = key + "";
        String keyInt = keyStr.substring(4);
        System.out.println("keyStr : "+keyStr + "keyInt : "+keyInt);

        int i = Integer.parseInt(keyInt);

        return i%2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

public class ProducerSample {

    private final static String TOPIC_NAME="jiangzh-topic";
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Producer异步发送带回调函数和Partition负载均衡
        producerSendWithCallbackAndPartition();
    }


    /*
        Producer异步发送带回调函数和Partition负载均衡
     */
    public static void producerSendWithCallbackAndPartition(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.imooc.jiangzh.kafka.producer.SamplePartition");

        // Producer的主对象
        Producer<String,String> producer = new KafkaProducer<>(properties);

        // 消息对象 - ProducerRecoder
        for(int i=0;i<10;i++){
            ProducerRecord<String,String> record =
                    new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println(
                            "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
                }
            });
        }

        // 所有的通道打开都需要关闭
        producer.close();
    }
}

Producer发送消息传递保障

  • 最多一次:收到0到1次
  • 至少一次:收到1到多次
  • 正好一次:有且仅有一次

Producer发送-项目应用

请求参数实例:
{
	templateId:"001", 
	result:[
		{"questionId":"1","question":"今天几号","answer":"A"}, 
		{"questionId":"2","question":"你喜爱的颜色","answer":"B"}
	]
}

public class KafkaTest {
    // kafka producer将数据推送至Kafka Topic
    public void templateReported(JSONObject reportInfo) {
        log.info("templateReported : [{}]", reportInfo);
        String topicName = "hefery-topic";
        // 发送Kafka数据
        String templateId = reportInfo.getString("templateId");
        JSONArray reportData = reportInfo.getJSONArray("result");

        // 如果templateid相同,后续在统计分析时,可以考虑将相同的id的内容放入同一个partition,便于分析使用
        ProducerRecord<String,Object> record = new ProducerRecord<>(topicName,templateId,reportData);

        /*
            1、Kafka Producer线程安全,建议多线程复用,如果每个线程都创建,出现大量的上下文切换或争抢的情况,影响Kafka效率
            2、Kafka Producer的key是一个很重要的内容:
                2.1 我们可以根据Key完成Partition的负载均衡
                2.2 合理的Key设计,可以让Flink、Spark Streaming之类的实时分析工具做更快速处理
            3、ack - all, kafka层面上就已经有了只有一次的消息投递保障,如果想不丢数据,最好自行处理异常
         */
        try{
            producer.send(record);
        }catch (Exception e){
            // 将数据加入重发队列, redis,es,...
        }
    }
}

Consumer APl

Connector API

Streams API

允许应用程序充当流处理器,从一个或多个 Topic 消耗的输入流,并产生一个输出流至一个或多个输出的Topic,有效地变换所述输入流,以输出流

Kafka设计原理

位移主题存的到底是什么格式的消息呢?所谓的消息格式,你可以简单地理解为是一个 KV 对。Key 和 Value 分别表示消息的键值和消息体,在 Kafka 中它们就是字节数组而已。

位移主题的 Key 中应该保存 3 部分内容:<GroupID,主题名,分区号 >

Group ID 吗?没错,就是这个字段,它能够标识唯一的 Consumer Group。

Kafka项目设计

Kafka 保证消息不丢失

首先明确 Kafka 持久化保证的含义和限定条件。再熟练配置 Kafka 无消息丢失参数

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

  • 已提交的消息:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交的消息”
  • 有限度的持久化保证:Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器

生产者程序丢失数据

目前 Kafka Producer 异步发送消息,也就是说如果调用的是 producer.send(msg),那么它通常会立即返回,但此时不能认为消息发送已成功完成。因此如果出现消息丢失,我们无法知晓

如果用这种方式,可能会有哪些因素导致消息没有发送成功呢?

  • 网络抖动,导致消息压根就没有发送到 Broker 端
  • 消息本身不合格导致 Broker 拒绝接收(消息太大了,超过了 Broker 的承受能力)

这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了

Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer端而非 Broker 端

消费者程序丢失数据

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。

Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了

Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形

Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。关键在于 Consumer 自动提交位移。这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。单个Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

最佳实践

  • 消息生产者发送消息不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法
  • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义
  • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生
  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余
  • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1
  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas +1
  • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的

Kafka 拦截器

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

生产者拦截器

在发送消息前以及消息提交成功后植入你的拦截器逻辑

  • 在 Producer 端设置参数 interceptor.classes
    Properties props = new Properties();
    List<String> interceptors = new ArrayList<>();
    interceptors.add("com.hefery.kafkaproject.interceptors.AddTimestampInterceptor"); 
    interceptors.add("com.hefery.kafkaproject.interceptors.UpdateCounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    
  • 编写的所有Producer 端拦截器都要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法
    • onSend:在消息发送之前被调用。在发送之前对消息“美美容”
    • onAcknowledgement:在消息成功提交或发送失败之后被调用。onAcknowledgement 的调用要早于发送回调通知 callback 的调用。值得注意的是这个方法和 onSend 不是在同一个线程中被调用的,因此如果在这两个方法中调用了某个共享可变对象,一定要保证线程安全。这个方法处在 Producer 发送的主路径中,最好别放太重的逻辑进去,否则 Producer TPS 直线下降

指定拦截器类时要指定它们的全限定名

消费者拦截器

在消费消息前以及提交位移后编写特定逻辑

  • 编写的所有Producer 端拦截器都要实现org.apache.kafka.clients..consumer.ConsumerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法
    • onConsume:在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你
    • onCommit:Consumer 在提交位移之后调用该方法。通常可以在该方法中做一些记账类的动作,比如记录日志等

指定拦截器类时要指定它们的全限定名

Kafka 消息交付可靠性保障

消息交付可靠性保障,是指 Kafka 对 Producer 和Consumer 要处理的消息提供什么样的承诺

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送

Kafka默认的是 至少一次(at least once)

消息成功“提交”,但 Broker 的应答没有成功发送回 Producer端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。也可以提供最多一次交付保障,只需要让 Producer禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。无论是至少一次还是最多一次,都不如精确一次,即消息既不会丢失,也不会被重复处理。或者说,即使Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条

Kafka 是怎么做到精确一次的呢?

  • 幂等性:Idempotence,最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态幂等性 Producer:仅需要设置一个参数即可,即props.put("enable.idempotence",ture)props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。在Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。幂等性 Producer 的作用范围:首先,它只能保证单分区上的幂等性,即一个幂等性Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了Producer 进程之后,这种幂等性保证就丧失了

  • 事务:Transaction,实现多分区以及多会话上的消息无重复设置事务型 Producer:开启 enable.idempotence=true 以及设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字,编码也要变化

    // 事务的初始化
    producer.initTransactions();
    try {
    	// 事务开始
    	producer.beginTransaction();
    	producer.send(record1);
    	producer.send(record2);
    	// 事务提交
    	producer.commitTransaction();
    } catch (KafkaException e) {
    	// 事务终止
    	producer.abortTransaction();
    }
    

    这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level参数的值即可。当前这个参数有两个取值:read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的Consumer 就不要使用这个值;read_committed:表明 Consumer 只会读取事务型Producer 成功提交事务写入的消息。当然,也能看到非事务型 Producer 写入的所有消息


标题:Kafka基础—必知必会
作者:Hefery
地址:http://hefery.icu/articles/2022/02/12/1644653536785.html