- kafka 生产消息分析
- 生产消息的实例代码
- 过程步骤
- 参数说明
- 创建生产者实例主流程
- sender的run流程
- 处理生产消息结果流程
package com.example.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class ProducerAnalysis {
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CommonHelper.BROKER_LIST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "demo-producer-client-1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaProducer producer = new KafkaProducer(properties);
ProducerRecord record = new ProducerRecord(CommonHelper.TOPIC, "Hello Kafka!");
try {
Future future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println("topic=" + metadata.topic()
+ ", partition=" + metadata.partition()
+ ", offset=" + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
}
}
过程步骤
- 配置生产的客户端的参数以及创建对应的生产者实例
- 构建待发送的消息
- 发送消息
- 关闭生产者实例
-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
:对应的参数其实就是bootstrap.servers
,用于建立到Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管这里为引导指定了哪些服务器;此列表只影响用于发现完整服务器集的初始主机。这个列表的格式应该是host1:port1,host2:port2,…。由于这些服务器仅用于初始连接,以发现完整的集群成员(可能会动态更改),因此该列表不需要包含完整的服务器集(但是,在服务器关闭的情况下,您可能需要多个服务器)。 -
ProducerConfig.CLIENT_ID_CONFIG
:对应的参数是client.id
,当发出请求时传递给服务器的id字符串。这样做的目的是允许在服务器端请求日志中包含逻辑应用程序名,从而能够跟踪请求的来源,而不仅仅是ip/port。 -
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
:key.serializer
,关键字的序列化器类,实现了org.apache.kafka.common.serialization.Serializer
interface. -
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
:value.serializer
,Serializer类的值实现了org.apache.kafka.common.serialization.Serializer
interface.Note 在创建生产者的时候使用了
ProducerConfig
类,在这个类中,是用了static{}块,来初始化一些默认配置。还有一些其他的关于生产者的配置可以在ProducerConfig
类中观察到。
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null, null, null);
}
KafkaProducer(ProducerConfig config,
Serializer keySerializer,
Serializer valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
// 获取用户提供的配置
Map userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
// 获取客户端点的id,如果没有就默认提供一个采用producer-累加
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() = RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts(),
error);
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
//进行重试 重试次数-1
if (canRetry(batch, response)) {
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
// If idempotence is enabled only retry the request if the current producer id is the same as
// the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
batch.topicPartition, batch.producerId(), batch.baseSequence());
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
}
}
// 序列号相同的消息 不进行重试
else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
// the sequence of the current batch, and we haven't retained batch metadata on the broker to return
// the correct offset and timestamp.
//
// The only thing we can do is to return success to the user and not return a valid offset and timestamp.
completeBatch(batch, response);
}
// 直接定义失败 返回
else {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
failBatch(batch, response, exception, batch.attempts()
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?