您当前的位置: 首页 >  kafka

顧棟

暂无认证

  • 4浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

kafka源码分析 生产消息过程

顧棟 发布时间:2021-07-05 18:01:34 ,浏览量:4

文章目录
  • kafka 生产消息分析
    • 生产消息的实例代码
      • 过程步骤
      • 参数说明
    • 创建生产者实例主流程
    • sender的run流程
    • 处理生产消息结果流程

kafka 生产消息分析 生产消息的实例代码
package com.example.demo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerAnalysis {


    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CommonHelper.BROKER_LIST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "demo-producer-client-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer producer = new KafkaProducer(properties);
        ProducerRecord record = new ProducerRecord(CommonHelper.TOPIC, "Hello Kafka!");
        try {
            Future future = producer.send(record);
            RecordMetadata metadata = future.get();
            System.out.println("topic=" + metadata.topic()
                    + ", partition=" + metadata.partition()
                    + ", offset=" + metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
过程步骤
  1. 配置生产的客户端的参数以及创建对应的生产者实例
  2. 构建待发送的消息
  3. 发送消息
  4. 关闭生产者实例
参数说明
  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG:对应的参数其实就是bootstrap.servers,用于建立到Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管这里为引导指定了哪些服务器;此列表只影响用于发现完整服务器集的初始主机。这个列表的格式应该是host1:port1,host2:port2,…。由于这些服务器仅用于初始连接,以发现完整的集群成员(可能会动态更改),因此该列表不需要包含完整的服务器集(但是,在服务器关闭的情况下,您可能需要多个服务器)。

  • ProducerConfig.CLIENT_ID_CONFIG:对应的参数是client.id,当发出请求时传递给服务器的id字符串。这样做的目的是允许在服务器端请求日志中包含逻辑应用程序名,从而能够跟踪请求的来源,而不仅仅是ip/port。

  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIGkey.serializer,关键字的序列化器类,实现了org.apache.kafka.common.serialization.Serializer interface.

  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIGvalue.serializer,Serializer类的值实现了org.apache.kafka.common.serialization.Serializer interface.

    Note 在创建生产者的时候使用了ProducerConfig类,在这个类中,是用了static{}块,来初始化一些默认配置。还有一些其他的关于生产者的配置可以在ProducerConfig类中观察到。

创建生产者实例主流程
public KafkaProducer(Properties properties) {
    this(new ProducerConfig(properties), null, null, null, null);
}

在这里插入图片描述

KafkaProducer(ProducerConfig config,
              Serializer keySerializer,
              Serializer valueSerializer,
              Metadata metadata,
              KafkaClient kafkaClient) {
    try {
        // 获取用户提供的配置
        Map userProvidedConfigs = config.originals();
        this.producerConfig = config;
        this.time = Time.SYSTEM;
        // 获取客户端点的id,如果没有就默认提供一个采用producer-累加
        String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        if (clientId.length() = RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
        // If the batch is too large, we split the batch and send the split batches again. We do not decrement
        // the retry attempts in this case.
        log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
                 correlationId,
                 batch.topicPartition,
                 this.retries - batch.attempts(),
                 error);
        if (transactionManager != null)
            transactionManager.removeInFlightBatch(batch);
        this.accumulator.splitAndReenqueue(batch);
        this.accumulator.deallocate(batch);
        this.sensors.recordBatchSplit();
    } else if (error != Errors.NONE) {
        //进行重试 重试次数-1
        if (canRetry(batch, response)) {
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                    correlationId,
                    batch.topicPartition,
                    this.retries - batch.attempts() - 1,
                    error);
            if (transactionManager == null) {
                reenqueueBatch(batch, now);
            } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
                // If idempotence is enabled only retry the request if the current producer id is the same as
                // the producer id of the batch.
                log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
                        batch.topicPartition, batch.producerId(), batch.baseSequence());
                reenqueueBatch(batch, now);
            } else {
                failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                        "batch but the producer id changed from " + batch.producerId() + " to " +
                        transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
            }
        }
        // 序列号相同的消息 不进行重试
        else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
            // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
            // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
            // the correct offset and timestamp.
            //
            // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
            completeBatch(batch, response);
        }
        // 直接定义失败 返回
        else {
            final RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
            else
                exception = error.exception();
            // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
            // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
            // thus it is not safe to reassign the sequence.
            failBatch(batch, response, exception, batch.attempts()             
关注
打赏
1663402667
查看更多评论
0.1869s