- kafka 消费消息源码分析
- 消费消息的实例代码
- 过程步骤
- 参数说明
- 创建消费者实例主流程
- 订阅主题
- 消费数据
- 消费位移
package com.example.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConsumerAnalysis {
public static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CommonHelper.BROKER_LIST);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerAnalysisGroup-1");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "demo-consumer-client-1");
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Collections.singletonList(CommonHelper.TOPIC));
try {
while (IS_RUNNING.get()) {
ConsumerRecords records = consumer.poll(10000);
System.out.println("records count is " + records.count());
for (ConsumerRecord record : records) {
System.out.println("topic=" + record.topic()
+ ", partition = " + record.partition()
+ ", offset=" + record.offset());
System.out.println("key=" + record.offset()
+ ", value= " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
过程步骤
- 配置消费者客户端参数以及创建消费者实例
- 订阅Topic
- 拉取消息并消费
- 提交消费offset
- 关闭消费者实例
-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
:对应的参数其实就是bootstrap.servers
,用于建立到Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管这里为引导指定了哪些服务器;此列表只影响用于发现完整服务器集的初始主机。这个列表的格式应该是host1:port1,host2:port2,…。由于这些服务器仅用于初始连接,以发现完整的集群成员(可能会动态更改),因此该列表不需要包含完整的服务器集(但是,在服务器关闭的情况下,您可能需要多个服务器)。 -
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
:对应的参数是key.deserializer
,实现org.apache.kafka.common.serialization.Deserializer
接口的密钥的反序列化器类。 -
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
:对应的参数是value.deserializer
,实现org.apache.kafka.common.serialization.Deserializer
接口的值的反序列化器类。 -
ConsumerConfig.GROUP_ID_CONFIG
:对应的参数是group.id
,标识此消费者所属的消费者组的唯一字符串。 如果消费者通过使用subscribe(topic)
或基于 Kafka 的偏移管理策略使用组管理功能,则需要此属性。 -
ConsumerConfig.CLIENT_ID_CONFIG
:对应的参数是client.id
,发出请求时传递给服务器的 id 字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,从而能够跟踪请求源,而不仅仅是 ip/port。Note 在创建生产者的时候使用了
ConsumerConfig
类,在这个类中,是用了static{}块,来初始化一些默认配置。还有一些其他的关于生产者的配置可以在ConsumerConfig
类中观察到。
补流程图
private KafkaConsumer(ConsumerConfig config,
Deserializer keyDeserializer,
Deserializer valueDeserializer) {
try {
// 如果没有配置客户端id参数,会默认生成一个
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取消费者配置
String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");
// 配置请求超时的时间,在时间超时且重试次数之后还是没有响应以失败处理
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// 与消费者组进行心跳的超时时间。超过时长则脱离消费组 触发再分配。
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
// 在没有达到fetch.min.bytes的时候,服务器在响应 fetch 请求之前将阻塞的最长时间。
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
// 请求超时时间必须大于心跳的超时时间和阻塞的最长时间
if (this.requestTimeoutMs retryBackoffMs)
pollTimeout = retryBackoffMs;
client.poll(pollTimeout, nowMs, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
消费者协调器的poll方法,主要用确认协调器还在运行,且消费者在一个消费组中,同时用来控制offset的自动提交。
public void poll(long now, long remainingMs) {
// 调用offset提交的回调,检查是否有提交记录 记录中是否有异常,日志输出。
invokeCompletedOffsetCommitCallbacks();
// 当订阅类型是AUTO_TOPICS或者AUTO_PATTERN,对消费者协调器和组协调器进行有效性的确认处理,若客户端需要重新加入消费组对正则的订阅主题方式需要进行元数据的更新
if (subscriptions.partitionsAutoAssigned()) {
if (coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
ensureActiveGroup();
now = time.milliseconds();
}
pollHeartbeat(now);
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
// 对于采用手动分配分区方式的客户端,在没有就绪节点和需要更新元数据的情况下,需要进行一次元数据的更新,在更新时候还是没有就绪节点则放弃这次轮询。
if (metadata.updateRequested() && !client.hasReadyNodes()) {
boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs);
if (!metadataUpdated && !client.hasReadyNodes())
return;
now = time.milliseconds();
}
}
//尝试进行offset的异步自动提交
maybeAutoCommitOffsetsAsync(now);
}
如果配置了enable.auto.commit
为true
,且满足auto.commit.interval.ms
配置的时长,默认5s
,则进行异步式的offset自动提交
private void doAutoCommitOffsetsAsync() {
// 获取所有主题分区的offset信息,不包含offset为NULL的
Map allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
// 回调主要用来处理异常输出日志
@Override
public void onComplete(Map offsets, Exception exception) {
if (exception != null) {
if (exception instanceof RetriableException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
}
});
}
public void commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
lookupCoordinator().addListener(new RequestFutureListener() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}
@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
对于协调器正常的会直接进行offset的提交
private void doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
RequestFuture future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener() {
@Override
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException)
commitException = new RetriableCommitFailedException(e);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}
对于协调器有问题的,尝试找到它然后发送提交或失败(我们不希望递归重试会导致偏移提交无序到达)。 请注意,可能有多个偏移提交链接到同一个协调器查找请求。 这很好,因为侦听器将按照添加它们的相同顺序被调用。 另请注意,AbstractCoordinator 可防止多个并发协调器查找请求。