Flink之KeyedState
- 创业
- 2025-08-12 07:27:02

前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.
需求 将接收到的Socket数据源中的字符串进行拼接 在命令行开启socket命令:nc -lk 8888 业务代码public class FlinkKeyedState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为1,便于观察 env.setParallelism(1); // 开启Checkpoint, 8秒一个周期并开启一次性语义 env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE); // 指定checkpoint持久化路径 env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint"); // 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5))); // 获取Socket数据源 DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888); // 将数据进行分组,将分组key给一个常量值 SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1") // 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等 .map(new RichMapFunction<String, String>() { ListState<String> listState; // open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器 @Override public void open(Configuration parameters) throws Exception { // 获取上下文 RuntimeContext ctx = getRuntimeContext(); // 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等 listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class)); } // 在map方法中正常编写业务逻辑 @Override public String map(String s) throws Exception { // 模拟Task失败 if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) { throw new Exception("Task 异常"); } // 将数据添加到状态存储器中 listState.add(s); Iterable<String> strings = listState.get(); StringBuilder builder = new StringBuilder(); for (String string : strings) { builder.append(string); } return builder.toString(); } }); map.print(); env.execute("Keyed State"); } } API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度、上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.Flink之KeyedState由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Flink之KeyedState”
上一篇
功能测试进阶建议,学习思路讲解