Flink之常用处理函数
- 人工智能
- 2025-08-16 09:36:02

常用处理函数 处理函数概述 基本处理函数ProcessFunction介绍使用示例 按键分区处理函数KeyedProcessFunction介绍定时器Timer和定时服务TimerService使用示例其他 窗口处理函数ProcessWindowFunction介绍ProcessAllWindowFunction介绍使用示例 流的合并处理函数CoProcessFunction介绍使用示例 流的联结处理函数窗口联结 JoinFunction间隔联结 ProcessJoinFunction迟到数据的处理 广播流处理函数KeyedBroadcastProcessFunctionBroadcastProcessFunction使用示例 处理函数 概述
处理函数(Processing Function)是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理,允许开发人员编写自定义逻辑以执行各种操作,如转换、聚合、筛选、连接等,并在处理后生成输出数据流。
对于数据流,都可以直接调用.process()方法进行自定义处理,传入的参数就叫作处理函数,也可以把它划分为转换算子。
基本处理函数ProcessFunction是最基本的处理函数,基于DataStream直接调用.process()时作为参数传入
ProcessFunction介绍ProcessFunction是一个抽象类,它继承AbstractRichFunction,有两个泛型类型参数:
1.输入的数据类型 2.处理完成之后输出数据类型内部单独定义了两个方法:
1.必须要实现的抽象方法.processElement() 2.一个非抽象方法.onTimer()ProcessFunction类如下:
/** * 处理流元素的函数 * * 对于输入流中的每个元素,调用processElement(Object,ProcessFunction.Context,Collector) 可以产生零个或多个元素作为输出 * 还可以通过提供的ProcessFunction.Context查询时间和设置计时器 * * 对于触发计时器,将调用onTimer(long,ProcessFunction.OnTimerContext,Collector) 可以再次产生零个或多个元素作为输出,并注册其他计时器 * * @param <I> 输入元素的类型 * @param <O> 输出元素的类型 */ @PublicEvolving public abstract class ProcessFunction<I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; /** * 处理输入流中的一个元素,对于流中的每个元素都会调用一次 * * 可以使用输出零个或多个元素收集器参数,并使用更新内部状态或设置计时器ProcessFunction.Context参数 * * @param value 输入值,类型与流中数据类型一致 * @param ctx ProcessFunction的内部抽象类Context,表示当前运行的上下文,可以获取当前时间戳,用于查询时间和注册定时器的定时服务 * @param out 用于返回结果值的收集器,与out.collect()方法向下游发数据类似 */ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; /** * 当使用设置计时器时调用TimerService * * 只有在注册好的定时器触发的时候才会调用,而定时器是通过定时服务TimerService来注册的 * * 事件时间语义下就是由水位线watermark来触发 * * 也可以自定义数据按照时间分组、定时触发计算输出结果,实现类似窗口window的功能 * * @param timestamp 触发计时器的时间戳,指设定好的触发时间 * @param ctx 上下文 * @param out 用于返回结果值的收集器 */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} } 使用示例基本处理函数ProcessFunction的使用与基本的转换操作类似,直接基于DataStream调用.process()方法,传入一个ProcessFunction作为参数,用来定义处理逻辑。
具体举例使用示例如下:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6)); /** * 创建OutputTag对象 * 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation) */ OutputTag<Integer> evenTag = new OutputTag<>("even", Types.INT); OutputTag<Integer> oddTag = new OutputTag<>("odd", Types.INT); // 使用process算子 SingleOutputStreamOperator<Integer> process = stream.process( new ProcessFunction<Integer, Integer>() { @Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { if (value > 0) { if (value % 2 == 0) { // 偶数放到侧输出流evenTag中 // 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据 ctx.output(evenTag, value); } else if (value % 2 == 1) { // 奇数放到侧输出流oddTag中 ctx.output(oddTag, value); } } else { // 负数 数据,放到主流中 out.collect(value); } } } ); // 在主流中,根据标签 获取 侧输出流 SideOutputDataStream<Integer> even = process.getSideOutput(evenTag); SideOutputDataStream<Integer> odd = process.getSideOutput(oddTag); // 打印主流 process.printToErr("主流-负数-job"); //打印 侧输出流 even.print("偶数-job"); odd.print("奇数-job"); env.execute(); } 奇数-job:1> 1 偶数-job:2> 2 奇数-job:1> 3 偶数-job:2> 4 奇数-job:1> 5 主流-负数-job:2> -6 按键分区处理函数KeyedProcessFunction对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,必须基于KeyedStream
KeyedProcessFunction介绍KeyedProcessFunction与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。
按键分区处理函数接口如下:
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} } 定时器Timer和定时服务TimerService另外在KeyedStream中是支持使用定时服务TimerService,可以通过它访问流中的事件event、时间戳timestamp、水位线watermark,甚至可以注册定时事件。
在onTimer()方法中可以实现定时处理的逻辑,而它触发的前提是之前曾经注册过定时器、并且现在已经到了触发时间。
注册定时器的功能是通过上下文中提供的定时服务来实现的。
// 获取定时服务 TimerService timerService = ctx.timerService();TimerService是Flink关于时间和定时器的基础服务接口,对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器,具体方法如下:
// 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线(事件时间) long currentWatermark(); // 注册处理时间定时器,当处理时间超过time时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器,当水位线超过time时触发 void registerEventTimeTimer(long time); // 删除触发时间为time的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为time的处理时间定时器 void deleteEventTimeTimer(long time);注意:
尽管处理函数中都可以访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法
使用示例直接基于keyBy之后的KeyedStream,直接调用.process()方法,传入KeyedProcessFunction的实现类参数
必须实现processElement()抽象方法,用来处理流中的每一个数据 必须实现非抽象方法onTimer(),用来定义定时器触发时的回调操作 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从socket接收数据流 SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086) // 将输入数据转换为Tuple2 .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return Tuple2.of(split[0], Integer.valueOf(split[1])); } }) // 指定 watermark策略 .assignTimestampsAndWatermarks( // 定义Watermark策略 WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // keyBy分区 KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0); // 按键分区处理函数 SingleOutputStreamOperator<Integer> process = keyByStream.process( new KeyedProcessFunction<String, Tuple2<String, Integer>, Integer>() { /** * 来一条数据调用一次 * @param value 当前数据 * @param ctx 上下文 * @param out 收集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Integer> out) throws Exception { //获取当前数据的key String currentKey = ctx.getCurrentKey(); p(); // 获取定时服务 TimerService timerService = ctx.timerService(); // 数据中提取出来的事件时间 Long currentEventTime = ctx.timestam // 注册事件时间定时器 timerService.registerEventTimeTimer(3000L); System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentEventTime + " 注册一个3s定时器"); /** * 时间进展到定时器注册的时间,调用该方法 * @param timestamp 定时器被触发时的时间 * @param ctx 上下文 * @param out 采集器 */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception { super.onTimer(timestamp, ctx, out); String currentKey = ctx.getCurrentKey(); System.out.println("key: " + currentKey + " 时间: " + timestamp + " 定时器触发"); } } ); process.print(); env.execute(); } 其他1.注册一个事件时间的定时器
事件时间定时器,通过watermark来触发,即watermark >= 注册的时间 水印watermark = 当前最大事件时间 - 等待时间 -1ms 例子:等待3s,3s定时器,事件时间6s 则watermark = 6s - 3s -1ms = 2.99s,不会触发3s的定时器 // 数据中提取出来的事件时间 Long currentEventTime = ctx.timestam // 注册事件时间定时器 timerService.registerEventTimeTimer(3000L); System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentEventTime + " 注册一个3s定时器");输入数据如下,当输入7时,水位线是7-3=4s-1ms=3.99s,即水位线超过定时器3s,执行触发回调操作
nc -lk 8086 key1,1 key1,2 key2,3 key2,4 key1,5 key2,6 key1,7控制台输出:
key: key1 当前数据: (key1,1) 当前时间: 1000 注册一个3s定时器 key: key1 当前数据: (key1,2) 当前时间: 2000 注册一个3s定时器 key: key2 当前数据: (key2,3) 当前时间: 3000 注册一个3s定时器 key: key2 当前数据: (key2,4) 当前时间: 4000 注册一个3s定时器 key: key1 当前数据: (key1,5) 当前时间: 5000 注册一个3s定时器 key: key2 当前数据: (key2,6) 当前时间: 6000 注册一个3s定时器 key: key1 当前数据: (key1,7) 当前时间: 7000 注册一个3s定时器 key: key1 时间: 3000 定时器触发 key: key2 时间: 3000 定时器触发注意:
TimerService会以键和时间戳为标准,对定时器进行去重,因此对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次
2.注册一个处理时间的定时器
long currentTs = timerService.currentProcessingTime(); timerService.registerProcessingTimeTimer(currentTs + 3000L); System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentTs + " 注册一个3s后的定时器");输入测试数据如下:
key1,1 key2,2当注册一个处理时间的定时器,3s后定时器会触发操作
key: key1 当前数据: (key1,1) 当前时间: 1688136512301 注册一个3s后的定时器 key: key2 当前数据: (key2,2) 当前时间: 1688136514179 注册一个3s后的定时器 key: key1 时间: 1688136515301 定时器触发 key: key2 时间: 1688136517179 定时器触发3.获取process当前watermark
long currentWatermark = timerService.currentWatermark(); System.out.println("当前数据: " + value + " 当前watermark: " + currentWatermark); key1,1 key1,2 key1,3结论:每次process处理,watermark是指上一条数据的事件时间-等待时间,例如:3-2-1ms=-1001
当前数据=(key1,1),当前watermark=-9223372036854775808 当前数据=(key1,2),当前watermark=-2001 当前数据=(key1,3),当前watermark=-10014.删除一个处理时间定时器
// 注册处理时间定时器 long currentTs = timerService.currentProcessingTime(); long timer = currentTs + 3000; timerService.registerProcessingTimeTimer(timer); System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentTs + " 注册一个3s后的定时器"); // 在3000毫秒后删除处理时间定时器 if("key1".equals(currentKey)){ timerService.deleteProcessingTimeTimer(timer) }输入测试数据:
key1,1 key2,2控制台输出结果:
key: key1 当前数据: (key1,1) 当前时间: 1688138104565 注册一个3s后的定时器 key: key2 当前数据: (key2,2) 当前时间: 1688138106441 注册一个3s后的定时器 key: key2 时间: 1688138109441 定时器触发 窗口处理函数窗口处理函数就是一种典型的全窗口函数,它是基于WindowedStream直接调用.process()方法
窗口处理函数有2个:
1.ProcessWindowFunction:
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入,必须是keyBy的数据流
2.ProcessAllWindowFunction:
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入,必须是非keyBy的数据流
ProcessWindowFunction介绍ProcessWindowFunction既是处理函数又是全窗口函数,具体接口如下:
/** * ProcessWindowFunction它有四个类型参数: * @param <IN> 数据流中窗口任务的输入数据类型 * @param <OUT> 窗口任务进行计算之后的输出数据类型 * @param <KEY> 数据中键key的类型 * @param <W> 窗口的类型,是Window的子类型。一般情况下我们定义时间窗口,W就是TimeWindow */ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { /** * 处理数据的核心方法process()方法 * * @param key 窗口做统计计算基于的键,也就是之前keyBy用来分区的字段 * @param context 当前窗口进行计算的上下文,它的类型就是ProcessWindowFunction内部定义的抽象类Context * @param elements 窗口收集到用来计算的所有数据,这是一个可迭代的集合类型 * @param out 用来发送数据输出计算结果的收集器,类型为Collector * @throws Exception */ public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * 主要是进行窗口的清理工作 * 如果自定义了窗口状态,那么必须在clear()方法中进行显式地清除,避免内存溢出 * @param context 当前窗口进行计算的上下文 * @throws Exception */ public void clear(Context context) throws Exception {} } ProcessAllWindowFunction介绍ProcessAllWindowFunction的用法类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ) .process(new MyProcessAllWindowFunction()) 使用示例以使用ProcessWindowFunction为例说明:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从socket接收数据流 SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8086); // 将输入数据转换为(key, value)元组 DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2 map(String s) throws Exception { int number = Integer.parseInt(s); String key = number % 2 == 0 ? "key1" : "key2"; Tuple2 tuple2 = new Tuple2(key, number); return tuple2; } }).returns(Types.TUPLE(Types.STRING, Types.INT)); // 将数据流按键分组,并定义滚动窗口(处理时间窗口) DataStream<String> resultStream = dataStream .keyBy(tuple -> tuple.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new MyProcessWindowFunction()); resultStream.print(); env.execute("ProcessWindowFunction Example"); } public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) { int sum = 0; for (Tuple2<String, Integer> element : elements) { sum += element.f1; } out.collect("Key: " + key + ", Window: " + context.window() + ", Sum: " + sum); } } 流的合并处理函数CoProcessFunction是合并connect两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入
CoProcessFunction介绍调用.process()时,传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法
CoProcessFunction类具体结构如下:
/** * 用于同时处理两个连接的流 * 它允许定义自定义处理逻辑,以处理来自两个不同输入流的事件并生成输出 * * @param <IN1> 第一个输入流的元素类型 * @param <IN2> 第二个输入流的元素类型 * @param <OUT> 输出元素的类型 */ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction { /** * 处理第一个输入流的元素 * * @param value 第一个输入流的元素 * @param ctx 用于访问上下文信息,例如事件时间和状态的Context对象 * @param out 用于发射输出元素的Collector对象 * @throws Exception 处理时可能抛出的异常 */ public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception; /** * 处理第二个输入流的元素 * * @param value 第二个输入流的元素 * @param ctx 用于访问上下文信息,可以使用Context对象来访问事件时间、水位线和状态等上下文信息 * @param out 用于发射输出元素的Collector对象 * @throws Exception 处理时可能抛出的异常 */ public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception; /** * 当定时器触发时调用的方法。可以重写这个方法来执行基于时间的操作 * * @param timestamp 触发定时器的时间戳 * @param ctx 用于访问上下文信息,如事件时间和状态的OnTimerContext对象 * @param out 用于发射输出元素的Collector对象 * @throws Exception 处理时可能抛出的异常 */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {} } 使用示例假设有两个输入流,将这两个流合并计算得到每个key对应的合计,并输出结果流
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 4), Tuple2.of("key1", 2)); DataStreamSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("key1", 3), Tuple2.of("key2", 5), Tuple2.of("key2", 6)); ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connect = source1.connect(source2); // 进行keyby操作,将key相同数据放到一起 ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0); /** * 对2个流中相同key的值求和 */ SingleOutputStreamOperator<String> process = connectKeyby.process( new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { Map<String, Integer> map = new HashMap<>(); /** * 第一条流的处理逻辑 * @param value 第一条流的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { String key = value.f0; if (!map.containsKey(key)) { // 如果key不存在,则将值直接put进map map.put(key, value.f1); } else { // key存在,则计算:获取上一次put的值 + 本次的值 Integer total = map.get(key) + value.f1; map.put(key, total); } out.collect("processElement1 key = " + key + " value = " + value + "total = " + map.get(key)); } /** * 第二条流的处理逻辑 * @param value 第二条流的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { String key = value.f0; if (!map.containsKey(key)) { // 如果key不存在,则将值直接put进map map.put(key, value.f1); } else { // key存在,则计算:获取上一次put的值 + 本次的值 Integer total = map.get(key) + value.f1; map.put(key, total); } out.collect("processElement2 key = " + key + " value = " + value + "total = " + map.get(key)); } } ); process.print(); env.execute(); } 3> processElement1 key = key2 value = (key2,4)total = 4 4> processElement1 key = key1 value = (key1,1)total = 1 4> processElement2 key = key1 value = (key1,3)total = 4 4> processElement1 key = key1 value = (key1,2)total = 6 3> processElement2 key = key2 value = (key2,5)total = 9 3> processElement2 key = key2 value = (key2,6)total = 15 流的联结处理函数JoinFunction 和 ProcessJoinFunction 是 Flink 中用于执行窗口连接操作的两个不同接口
窗口联结 JoinFunctionFlink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。
JoinFunction接口如下:
/** * 联接通过在指定的键上联接两个数据集的元素来组合它们,每对连接元素都调用此函数 * * 默认情况下,连接严格遵循SQL中 “inner join” 的语义 * * @param <IN1> 第一个输入中元素的类型 * @param <IN2> 第二个输入中元素的类型 * @param <OUT> 结果元素的类型 */ public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * join方法,每对联接的元素调用一次 * * @param first 来自第一个输入的元素 * @param second 来自第二个输入的元素 * @return 生成的元素 */ OUT join(IN1 first, IN2 second) throws Exception; }具体语法格式如下:
/** * 1.调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams * 2.通过.where()和.equalTo()方法指定两条流中联结的key。注意:两者相同的元素,如果在同一窗口中,才可以匹配起来 * 3.通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算 */ stream1.join(stream2) // where()参数是KeySelector键选择器,用来指定第一条流中的key .where(<KeySelector>) // equalTo()传入KeySelector则指定第二条流中的key .equalTo(<KeySelector>) // window()传入窗口分配器 .window(<WindowAssigner>) // apply()看作实现一个特殊的窗口函数,只能调用.apply()。传入JoinFunction是一个函数类接口,使用时需要实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据。 .apply(<JoinFunction>)示例如下:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 生成数据源1 DataStreamSource<Tuple2<String, Integer>> streamSource1 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4)); // 定义 使用 Watermark策略 SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = streamSource1 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L)); // 生成数据源2 DataStreamSource<Tuple2<String, Integer>> streamSource2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4), Tuple2.of("d", 5), Tuple2.of("e", 6)); // 定义 使用 Watermark策略 SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = streamSource2 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L)); /** * 根据keyby的key进行匹配关联 * * 注意:落在同一个时间窗口范围内才能匹配 */ DataStream<String> join = stream1.join(stream2) // stream1的keyby .where(r1 -> r1.f0) // stream2的keyby .equalTo(r2 -> r2.f0) // 传入窗口分配器 .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 传入JoinFunction函数类接口,实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据 .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { /** * 关联上的数据,调用join方法 * @param first stream1的数据 * @param second stream2的数据 */ @Override public String join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception { return "stream1 数据: " + first + " 关联 stream2 数据: " + second; } }); join.print(); env.execute(); }执行结果如下:
stream1 数据: (a,1) 关联 stream2 数据: (a,1) stream1 数据: (a,1) 关联 stream2 数据: (a,2) stream1 数据: (a,2) 关联 stream2 数据: (a,1) stream1 数据: (a,2) 关联 stream2 数据: (a,2) stream1 数据: (c,4) 关联 stream2 数据: (c,4) stream1 数据: (b,3) 关联 stream2 数据: (b,3) 间隔联结 ProcessJoinFunctionInterval Join即间隔联结,它是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。
ProcessJoinFunction接口情况如下 :
/** * 处理两个连接流的关联操作的抽象类 * 该类允许定义自定义的处理逻辑,以在连接两个流时处理匹配的元素 * * @param <IN1> 第一个输入流的元素类型 * @param <IN2> 第二个输入流的元素类型 * @param <OUT> 输出元素的类型 */ public interface ProcessJoinFunction<IN1, IN2, OUT> { /** * 处理连接两个流的元素 * * @param left 第一个输入流的元素 * @param right 第二个输入流的元素 * @param ctx 用于访问上下文信息的 Context 对象 * @param out 用于发射输出元素的 Collector 对象 * @throws Exception 处理时可能抛出的异常 */ void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; }间隔联结使用语法如下:
// 第一条流进行KeyedStream stream1 .keyBy(<KeySelector>) // 得到KeyedStream之后,调用.intervalJoin()合并两条流,传入一个KeyedStream参数,两者key类型应该一致,最终得到一个IntervalJoin类型 .intervalJoin(stream2.keyBy(<KeySelector>)) // 通过.between()方法指定间隔的上下界 .between(Time.milliseconds(-2), Time.milliseconds(1)) // 调用.process()方法,定义对匹配数据对的处理操作,传入一个处理函数 .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(left + "," + right); } });使用示例如下:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 生成数据源1 DataStreamSource<Tuple2<String, Integer>> streamSource1 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4)); // 定义 使用 Watermark策略 SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = streamSource1 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L)); // 生成数据源2 DataStreamSource<Tuple2<String, Integer>> streamSource2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4), Tuple2.of("d", 5), Tuple2.of("e", 6)); // 定义 使用 Watermark策略 SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = streamSource2 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L)); // 对2条流分别做keyby,key就是关联条件 KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = stream1.keyBy(r1 -> r1.f0); KeyedStream<Tuple2<String, Integer>, String> keyedStream2 = stream2.keyBy(r2 -> r2.f0); // 执行间隔联结 keyedStream1.intervalJoin(keyedStream2) .between(Time.seconds(-2), Time.seconds(2)) .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { /** * 当两条流数据匹配上时调用这个方法 * @param left stream1的数据 * @param right stream2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, Context ctx, Collector<String> out) throws Exception { // 关联的数据 out.collect("stream1 数据: " + left + " 关联 stream2 数据: " + right); } }) .print(); env.execute(); } stream1 数据: (a,1) 关联 stream2 数据: (a,1) stream1 数据: (a,1) 关联 stream2 数据: (a,2) stream1 数据: (a,2) 关联 stream2 数据: (a,2) stream1 数据: (a,2) 关联 stream2 数据: (a,1) stream1 数据: (b,3) 关联 stream2 数据: (b,3) stream1 数据: (c,4) 关联 stream2 数据: (c,4) 迟到数据的处理窗口间隔联结处理函数可以实现对迟到数据的处理
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource1 = env.socketTextStream("112.74.96.150", 8086) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return Tuple2.of(split[0], Integer.valueOf(split[1])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource2 = env.socketTextStream("112.74.96.150", 8087) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return Tuple2.of(split[0], Integer.valueOf(split[1])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // 对2条流分别做keyby,key就是关联条件 KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = streamSource1.keyBy(r1 -> r1.f0); KeyedStream<Tuple2<String, Integer>, String> keyedStream2 = streamSource2.keyBy(r2 -> r2.f0); // 定义 标记操作符的侧面输出 OutputTag<Tuple2<String, Integer>> keyedStream1OutputTag = new OutputTag<>("keyedStream1", Types.TUPLE(Types.STRING, Types.INT)); OutputTag<Tuple2<String, Integer>> keyedStream2OutputTag = new OutputTag<>("keyedStream2", Types.TUPLE(Types.STRING, Types.INT)); // 执行间隔联结 SingleOutputStreamOperator<String> process = keyedStream1.intervalJoin(keyedStream2) // 指定间隔的上界、下界的偏移,负号代表时间往前,正号代表时间往后 // 若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据 .between(Time.seconds(-2), Time.seconds(2)) // 将streamSource1迟到数据,放入侧输出流 .sideOutputLeftLateData(keyedStream1OutputTag) // 将streamSource2迟到数据,放入侧输出流 .sideOutputRightLateData(keyedStream2OutputTag) // 对匹配数据对的处理操作 只能处理 join上的数据 .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { /** * 当两条流数据匹配上时调用这个方法 * @param left stream1的数据 * @param right stream2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,是关联上的数据 out.collect("stream1 数据: " + left + " 关联 stream2 数据: " + right); } }); process.print("主流"); process.getSideOutput(keyedStream1OutputTag).printToErr("streamSource1迟到数据"); process.getSideOutput(keyedStream2OutputTag).printToErr("streamSource2迟到数据"); env.execute(); }1.2条流数据匹配 若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据
nc -lk 8086 key1,5 nc -lk 8087 key1,3 key1,7 key1,8 主流> stream1 数据: (key1,5) 关联 stream2 数据: (key1,3) 主流> stream1 数据: (key1,5) 关联 stream2 数据: (key1,7)2.keyedStream2迟到数据 此时,keyedStream1中水位线是5-3=2,keyedStream2中水位线是8-3=5,多并行度下水位线取最小,即取水位线2
在keyedStream2输入事件时间1
nc -lk 8087 key1,3 key1,7 key1,8 key1,1事件时间1 < 水位线2,且事件时间1被keyedStream1的事件时间5的上界5-2=3与下界5+2=7不包含,即数据不匹配且streamSource2数据迟到
streamSource2迟到数据> (key1,1)3.keyedStream1迟到数据
keyedStream1输入事件时间7
nc -lk 8086 key1,5 key1,7此时匹配到streamSource2中的8、7
主流> stream1 数据: (key1,7) 关联 stream2 数据: (key1,8) 主流> stream1 数据: (key1,7) 关联 stream2 数据: (key1,7)此时,keyedStream1的水位线是7-3=4,keyedStream2的水位线是8-3=5,多并行度下水位线取最小,即取水位线4
keyedStream1输入事件时间3
nc -lk 8086 key1,5 key1,7 key1,3事件时间3 < 水位线4,且事件时间3被keyedStream2的事件时间3的上界3-2=1与下界3+2=5包含,即数据匹配且streamSource1数据迟到
streamSource1迟到数据> (key1,3) 广播流处理函数用于连接一个主数据流和多个广播数据流。可以实现processElement 方法来处理主数据流的每个元素,同时可以处理广播数据流,通常用于数据广播和连接。
广播流处理函数有2个:
1.BroadcastProcessFunction:
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接之后的产物
2.KeyedBroadcastProcessFunction:
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个KeyedStream与广播流做连接
KeyedBroadcastProcessFunction /** * @param <KS> 输入键控流的键类型 * @param <IN1> 键控 (非广播) 端的输入类型 * @param <IN2> 广播端的输入类型 * @param <OUT> 运算符的输出类型 */ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction { private static final long serialVersionUID = -2584726797564976453L; /** * (非广播) 的键控流中的每个元素调用此方法 * * @param value 流元素 * @param ctx 允许查询元素的时间戳、查询当前处理/事件时间以及以只读访问迭代广播状态 * @param out 将结果元素发出 */ public abstract void processElement( final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception; /** * 针对broadcast stream中的每个元素调用该方法 * * @param value stream元素 * @param ctx 上下文 许查询元素的时间戳、查询当前处理/事件时间和更新广播状态 * @param out 将结果元素发射到 */ public abstract void processBroadcastElement( final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception; /** * 当使用TimerService设置的计时器触发时调用 * * @param timestamp 触发计时器的时间戳 * @param ctx 上下文 * @param out 返回结果值的收集器 */ public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out) throws Exception { } } BroadcastProcessFunctionBroadcastProcessFunction与KeyedBroadcastProcessFunction类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { public abstract void processElement( final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception; } 使用示例以使用KeyedBroadcastProcessFunction为例说明:
public class KeyedBroadcastProcessFunctionExample { /** * 主流 数据对象 */ @Data @AllArgsConstructor @NoArgsConstructor public static class MainRecord { private String key; private int value; } /** * 广播流 数据对象 */ @Data @AllArgsConstructor @NoArgsConstructor public static class BroadcastRecord { private String configKey; private int configValue; } /** * 结果 数据对象 */ @Data @AllArgsConstructor @NoArgsConstructor public static class ResultRecord { private String key; private int result; } // 使用给定的名称和给定的类型信息新建一个MapStateDescriptor static MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("broadcastState", String.class, Integer.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建主数据流 DataStream<MainRecord> mainStream = env.fromElements( new MainRecord("A", 10), new MainRecord("B", 20), new MainRecord("A", 30) ); // 创建广播数据流 DataStream<BroadcastRecord> broadcastStream = env.fromElements( new BroadcastRecord("config", 5) ); // 将广播数据流转化为 BroadcastStream BroadcastStream<BroadcastRecord> broadcast = broadcastStream.broadcast(mapStateDescriptor); // 使用 KeyedBroadcastProcessFunction 连接主数据流和广播数据流 DataStream<ResultRecord> resultStream = mainStream .keyBy(new MainRecordKeySelector()) .connect(broadcast) .process(new MyKeyedBroadcastProcessFunction()); resultStream.print(); env.execute("KeyedBroadcastProcessFunction Example"); } /** * 使用提供的键对其运算符状态进行分区 */ public static class MainRecordKeySelector implements KeySelector<MainRecord, String> { @Override public String getKey(MainRecord mainRecord) { return mainRecord.getKey(); } } /** * */ public static class MyKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, MainRecord, BroadcastRecord, ResultRecord> { @Override public void processBroadcastElement(BroadcastRecord value, Context ctx, Collector<ResultRecord> out) throws Exception { // 通过上下文获取广播状态 BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor); // 处理广播数据流中的每个元素,更新广播状态 broadcastState.put(value.getConfigKey(), value.getConfigValue()); } @Override public void processElement(MainRecord value, ReadOnlyContext ctx, Collector<ResultRecord> out) throws Exception { // 在 processElement 中访问广播状态 ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor); // 从广播状态中获取配置值 Integer configValue = broadcastState.get("config"); // 注意:刚启动时,可能是数据流的第1 2 3...条数据先来 不是广播流先来 if (configValue == null) { return; } System.out.println(String.format("主数据流的Key: %s, value: %s, 广播更新结果: %s", value.key, value.value, value.value + configValue)); // 根据配置值和主数据流中的元素执行处理逻辑 int result = value.getValue() + configValue; // 发出结果记录 out.collect(new ResultRecord(value.getKey(), result)); } } } 主数据流的Key: A, value: 10, 广播更新结果: 15 主数据流的Key: B, value: 20, 广播更新结果: 25 2> KeyedBroadcastProcessFunctionExample.ResultRecord(key=B, result=25) 7> KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=15) 主数据流的Key: A, value: 30, 广播更新结果: 35 7> KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=35)Flink之常用处理函数由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Flink之常用处理函数”