问题出现场景
为了评估一个 Flink 程序的处理效果,我使用本地模式启动了 Flink 程序,并在上游表中一次性插入了大量数据(大概相当于线上单个并发 4 - 5 分钟的最大处理量),以触发计算。
但是,在本地计算中,一直无法计算完成,观察后发现任务在被重复计算,进而发现 Flink 在不断从 checkpoint 恢复。在日志中搜索 checkpoint,发现如下报错信息:
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1934) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1906) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:96) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1990) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_391]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_391]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_391]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_391]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_391]
该报错大概每 10 分钟左右出现一次,具体如下:
11:30:44 - 启动 Flink 程序11:41:17 - 第一次报错(在启动后 10 分钟 33 秒)11:51:48 - 第二次报错(在上次报错后 10 分钟 31 秒)12:02:19 - 第三次报错(在上次报错后 10 分钟 31 秒)……
问题原因
通过定位,发现是 checkpoint 超时报错。Flink 的 checkpoint 的超时时间时 600 秒,但是这个任务需要 11 分钟才能完成。本地之所以比线上慢,一方面是因为本地增加了一部分新的逻辑;另一方面也可能是因为线上运行时,对 MySQL 请求时走的是内网请求,而本地运行走的是外网请求。
Flink 的 checkpoint 超时时间默认值的源码位置如下:
// org.apache.flink.streaming.api.environment.CheckpointConfig
public class CheckpointConfig implements Serializable {
public CheckpointConfig() {
this.checkpointingMode = DEFAULT_MODE;
this.checkpointInterval = -1L;
this.checkpointTimeout = 600000L;
this.minPauseBetweenCheckpoints = 0L;
this.maxConcurrentCheckpoints = 1;
this.alignmentTimeout = (Duration)ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
this.failOnCheckpointingErrors = true;
this.preferCheckpointForRecovery = false;
this.tolerableCheckpointFailureNumber = -1;
}
解决方法
在本地测试时,如需较大数据量测试,显式地设置 checkpoint 超时时间即可:
env.getCheckpointConfig().setCheckpointTimeout(1200000);
精彩链接
发表评论