背景

每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰,本文就来从源码看看这两个参数的作用

enableCommitOnCheckpoints 和 enable.auto.commit参数

1.FlinkKafkaConsumerBase的open方法,查看offsetCommitMode的赋值

public void open(Configuration configuration) throws Exception {

// determine the offset commit mode

this.offsetCommitMode = OffsetCommitModes.fromConfiguration(

getIsAutoCommitEnabled(),

enableCommitOnCheckpoints,

((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

}

2.OffsetCommitModes.fromConfiguration方法

public static OffsetCommitMode fromConfiguration(

boolean enableAutoCommit,

boolean enableCommitOnCheckpoint,

boolean enableCheckpointing) {

if (enableCheckpointing) {

// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled

return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;

} else {

// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties

return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;

}

}

从这个代码可知,enableCommitOnCheckpoint 和 enableAutoCommit是不会同时存在的,也就是flink如果在checkpoint的时候提交偏移,他就肯定不会设置enableAutoCommit自动提交,反之亦然

enableCommitOnCheckpoint 提交偏移的关键代码

1.FlinkKafkaConsumerBase.snapshotState方法

public final void snapshotState(FunctionSnapshotContext context) throws Exception {

if (!running) {

LOG.debug("snapshotState() called on closed source");

} else {

unionOffsetStates.clear();

final AbstractFetcher fetcher = this.kafkaFetcher;

if (fetcher == null) {

// the fetcher has not yet been initialized, which means we need to return the

// originally restored offsets or the assigned partitions

for (Map.Entry subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {

unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));

}

// 这里如果是checkpoint模式会在checkpoint的时候保存offset到状态中

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {

// the map cannot be asynchronously updated, because only one checkpoint call can happen

// on this function at a time: either snapshotState() or notifyCheckpointComplete()

pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);

}

}

2.FlinkKafkaConsumerBase.notifyCheckpointComplete方法

@Override

public final void notifyCheckpointComplete(long checkpointId) throws Exception {

final AbstractFetcher fetcher = this.kafkaFetcher;

final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);

fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);

enable.auto.commit参数

1.KafkaConsumerThread.run线程

if (records == null) {

try {

records = consumer.poll(pollTimeout);

}

catch (WakeupException we) {

continue;

}

}

2.KafkaConsumer的poll方法

private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {

acquireAndEnsureOpen();

try {

this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {

throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

}

// poll for new data until the timeout expires

do {

client.maybeTriggerWakeup();

// updateAssignmentMetadataIfNeeded方法是关键

if (includeMetadataInTimeout) {

if (!updateAssignmentMetadataIfNeeded(timer)) {

return ConsumerRecords.empty();

}

} else {

while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {

log.warn("Still waiting for metadata");

}

}

final Map>> records = pollForFetches(timer);

if (!records.isEmpty()) {

// before returning the fetched records, we can send off the next round of fetches

// and avoid block waiting for their responses to enable pipelining while the user

// is handling the fetched records.

//

// NOTE: since the consumed position has already been updated, we must not allow

// wakeups or any other errors to be triggered prior to returning the fetched records.

if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {

client.transmitSends();

}

return this.interceptors.onConsume(new ConsumerRecords<>(records));

}

} while (timer.notExpired());

return ConsumerRecords.empty();

} finally {

release();

this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());

}

}

3.KafkaConsumer.updateAssignmentMetadataIfNeeded方法

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {

if (coordinator != null && !coordinator.poll(timer)) {

return false;

}

return updateFetchPositions(timer);

}

4.ConsumerCoordinator.poll方法

public boolean poll(Timer timer) {

maybeUpdateSubscriptionMetadata();

invokeCompletedOffsetCommitCallbacks();

if (subscriptions.partitionsAutoAssigned()) {

if (protocol == null) {

throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +

" to empty while trying to subscribe for group protocol to auto assign partitions");

}

// Always update the heartbeat last poll time so that the heartbeat thread does not leave the

// group proactively due to application inactivity even if (say) the coordinator cannot be found.

pollHeartbeat(timer.currentTimeMs());

if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {

return false;

}

if (rejoinNeededOrPending()) {

// due to a race condition between the initial metadata fetch and the initial rebalance,

// we need to ensure that the metadata is fresh before joining initially. This ensures

// that we have matched the pattern against the cluster's topics at least once before joining.

if (subscriptions.hasPatternSubscription()) {

// For consumer group that uses pattern-based subscription, after a topic is created,

// any consumer that discovers the topic after metadata refresh can trigger rebalance

// across the entire consumer group. Multiple rebalances can be triggered after one topic

// creation if consumers refresh metadata at vastly different times. We can significantly

// reduce the number of rebalances caused by single topic creation by asking consumer to

// refresh metadata before re-joining the group as long as the refresh backoff time has

// passed.

if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {

this.metadata.requestUpdate();

}

if (!client.ensureFreshMetadata(timer)) {

return false;

}

maybeUpdateSubscriptionMetadata();

}

if (!ensureActiveGroup(timer)) {

return false;

}

}

} else {

// For manually assigned partitions, if there are no ready nodes, await metadata.

// If connections to all nodes fail, wakeups triggered while attempting to send fetch

// requests result in polls returning immediately, causing a tight loop of polls. Without

// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.

// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.

// When group management is used, metadata wait is already performed for this scenario as

// coordinator is unknown, hence this check is not required.

if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {

client.awaitMetadataUpdate(timer);

}

}

// 这里是重点

maybeAutoCommitOffsetsAsync(timer.currentTimeMs());

return true;

}

5.ConsumerCoordinatormaybeAutoCommitOffsetsAsync方法

public void maybeAutoCommitOffsetsAsync(long now) {

if (autoCommitEnabled) {

nextAutoCommitTimer.update(now);

if (nextAutoCommitTimer.isExpired()) {

nextAutoCommitTimer.reset(autoCommitIntervalMs);

doAutoCommitOffsetsAsync();

}

}

}

看到没,这里就是判断autoCommitEnabled的地方,这里如果打开了自动提交功能的话,就会进行offset的提交

特别重要的两点

1.kafkaconsumer当开始进行消费时,即使不提交任何偏移量,也不影响它消费消息,他还是能正常消费kafka主题的消息,这里提交偏移的主要作用在于当kafkaconsumer断线然后需要重连kafka broker进行消费时,此时它一般会从它最后提交的offset位置开始消费(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下),这才是consumer提交offset偏移的最大意义

2.对于flink来说,由于每次重启的时候,flink的consumer都会从checkpoint中把偏移取出来并设置,所以flink的consumer在消息消费过程中无论通过enableCommitOnCheckpoint 还是enableAutoCommit提交的偏移并没有意义,因为并没有使用到,它的意义只在于flink没有从checkpoint中启动时,此时flink的consumer才会从enableCommitOnCheckpoint 、enableAutoCommit提交的偏移开始消费消息(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下)

参考文章:https://blog.csdn.net/qq_42009500/article/details/119875158

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。