Task重启策略
1 策略API
-
noRestart
无参数,task失败后不重启,整个job同时失败,默认策略.
代码示例
RestartStrategies.noRestart();
-
fixedDelayRestart
参数 注释 restartAttempts 最大重启次数 delayBetweenAttempts 重启时间间隔 代码示例
// 最多重启5次,每次任务失败后间隔1s重启 RestartStrategies.fixedDelayRestart(5, 1000);
-
exponentialDelayRestart
参数 注释 initialBackoff 重启间隔惩罚时长初始值(重启延迟时间) maxBackoff 重启间隔最大惩罚时长 backoffMultiplier 重启间隔时长的惩罚倍数 resetBackoffThreshold 重置惩罚时长的平稳运行时长(平稳运行时长达到这个阈值后,再次发生故障则重启延迟时间恢复到初始值) jitterFactor 取一个随机数,来加在重启时间点上,已让每次重启的时间呈现一定随机性(避免某一时刻集群中有大量的task同时重启,如果task重启时间是规律性的就可能发生这种情况) 代码示例
// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
-
failureRateRestart
参数 注释 failureRate 指定时间范围内的最大Task任务失败率(次数) failureInterval 指定时间范围 delayInterval 重启时间间隔 代码示例
// task失败重启间隔为1s,只要在30分钟内task失败重启次数没超过3次就可以一直执行这个策略,如果超过则job停止 RestartStrategies.failureRateRestart(3, Time.minutes(30), Time.seconds(1));
-
fallBackRestart
无参数,常用于自定义的RestartStrategy,即用户自定义了重启策略,且将其配置在了flink-conf.yaml文件中,也就是说调用这个方法时会读取集群的配置文件,根据配置文件的内容调整策略
代码示例
RestartStrategies.fallBackRestart();
2 代码详情
public class FlinkCheckpoint {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启并设置checkpoint的时间间隔env.enableCheckpointing(3000);// 设置checkpoint的存储位置env.getCheckpointConfig().setCheckpointStorage(new Path("hdfs://lx01:8020/flink-ck"));// 允许checkpoint失败的最大次数env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);// checkpoint的算法模式,是否需要对其(EXACTLY_ONCE)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// job取消是否保留checkpoint数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 设置checkpoint对齐的超时时间env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofMillis(10000));// 两次checkpoint的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);// 并行最大的checkpoint数env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);// 选择后端状态(默认HashMapStateBackend)env.setStateBackend(new EmbeddedRocksDBStateBackend());// TODO Task重启策略RestartStrategies.RestartStrategyConfiguration restartStrategy = null;// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1srestartStrategy = RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);// 配置Task重启策略env.setRestartStrategy(restartStrategy);// ...业务代码env.execute();}
}