在Flink状态使用过程中有时需要清除State
中不许需要的数据,否则State
中的数据会越来越多,既增加了内存压力,也降低了计算效率.而TTL
机制可以很好的帮我们解决这个分体,利用TTL
机制可以将状态中的冷热数据分离,将使用率很低的冷数据及时清除.
这里以Operator State
为例子
class StateMapFunc2 implements MapFunction<String, List<Tuple2<String, String>>>, CheckpointedFunction {private ListState<Tuple2<String, String>> listState;@Overridepublic List<Tuple2<String, String>> map(String s) throws Exception {// ...}@Overridepublic void snapshotState(FunctionSnapshotContext ctx) throws Exception {}@Overridepublic void initializeState(FunctionInitializationContext ctx) throws Exception {OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();// 配置State TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 设置数据存活时长,当该数据在State中存活时间超过10s时删除该数据// 这个方法也是设置数据存活时长,和StateTtlConfig.Builder(Time.seconds(10))的作用一样,可以不用这个方法,如果用了会覆盖上面设置的时长.setTtl(Time.seconds(10))/*** updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二选一即可, 这两个方法的主要作用就是配合setTtl方法将冷热数据进行分离**/// 当该条数据在State中插入或者更新的时候,刷新计时.updateTtlOnCreateAndWrite()// 读或写都刷新该数据的TTL计时.updateTtlOnReadAndWrite()/*** setStateVisibility就是设置状态的可见性,前面setTtl方法是设置删除过期数据,删除过期数据实际上是由另一个异步线程周期性(定时器)的完成,也就是说超过10s的数据不一定会马上被删除,但是* 获取数据的时候底层会将超过存活时间的数据进行判断过滤,setStateVisibility就是可以设置是否可以查询到这些过期的数据,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二选一.**/// 不返回过期数据,这个也是默认策略.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 返回还没有被清除的过期数据.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)// 指定TTL计时时间语义(默认处理时间).setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime).build();// 配置状态描述,在ListStateDescriptor构造器中声明数据类型,简单类型可以使用xxx.class,符合类型需要使用到TypeInformation.of()ListStateDescriptor descriptor = new ListStateDescriptor("MapState", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// 状态描述器加载TTL配置descriptor.enableTimeToLive(ttlConfig);listState = operatorStateStore.getListState(descriptor);}
}
代码中只需要关注initializeState()
方法即可,里面列出了有关TTL
常用的API
,注释中也进行了相关的介绍.