2. TimerService源码public interface TimerService {// 不支持注册定时器的提示语String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";// 不支持删除定时器的提示语String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";// 当前处理时间long currentProcessingTime();// 当前水位线long currentWatermark();// 注册基于处理时间的定时器void registerProcessingTimeTimer(long var1);// 注册基于事件时间的定时器void registerEventTimeTimer(long var1);// 删除基于处理时间的定时器void deleteProcessingTimeTimer(long var1);// 删除基于事件时间的定时器void deleteEventTimeTimer(long var1);}3. 测试代码:/*** @title: KeyedProcessFunctionTest* @Author Tian* @Date: 2023/3/21 22:59* @Version 1.0*/public class KeyedProcessFunctionTest {public static void main(String[] args) throws Exception {// todo 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 处理原始数据并增加时间戳SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {JSONObject jsonObject = new JSONObject();jsonObject.put("userName", t);jsonObject.put("time", System.currentTimeMillis()-5000);return jsonObject;});// todo sourceStream设置水位线,指定事件时间字段sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject jsonObject, long l) {return jsonObject.getLong("time");}}));// todo 按照用户名分组// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);sourceStream.keyBy(data -> true).process(new MyKeyedProcessFunction());// todo 调用窗口处理函数并输出// keyedStream.process(new MyKeyedProcessFunction()).print("处理结果:");env.execute();}}/*** 自定义窗口处理函数*/class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean,JSONObject,String>{@Overridepublic void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {System.out.println("当前处理Key:"+ context.getCurrentKey());System.out.println("数据到达时间:"+ context.timestamp());System.out.println("当前处理时间:"+ context.timerService().currentProcessingTime());System.out.println("当前水位线:"+ context.timerService().currentWatermark());// todo 注册定时器,处理时间+1秒context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+1000);// todo 返回当前keycollector.collect(jsonObject.toJSONString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// todo 触发器触发的时候执行的逻辑 ;super.onTimer(timestamp, ctx, out);System.out.println("触发器触发了:"+ ctx.timestamp());}}3. 窗口处理函数(ProcesswindowsFunction)除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction),另一种是全窗口处理函数(ProcessAllWindowsFunction),ProcessWindowFunction获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性 。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止 。
ProcessWindowsFunction:处理分区数据,每个窗口执行一次process方法
1. ProcessWindowsFunction源码// IN: input,数据流中窗口任务的输入数据类型// OUT: output,窗口任务进行计算之后的输出数据类型 。// KEY:数据中键 key 的类型 。// W:窗口的类型,是 Window 的子类型 。一般情况下我们定义时间窗口,W就是 TimeWindow 。public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public ProcessWindowFunction() {}// Process处理数据方法public abstract void process(KEY var1, ProcessWindowFunction<IN, OUT, KEY, W>.Context var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;// 清除窗口数据方法public void clear(ProcessWindowFunction<IN, OUT, KEY, W>.Context context) throws Exception {}// context上下文包含的内容public abstract class Context implements Serializable {public Context() {}public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> var1, X var2);}}
推荐阅读
- 生成式AI时代的著作权之困
- 一个更好的视频码头
- 软件架构设计:B/S层次架构之MVC/MVP/MVVM
- 玉雕|玉之俏色,令人心醉!
- 5G、6G之间还隔着5.5G?听听华为专家怎么说
- |北洋造34年中上百万的和普通版有何区别之处
- 王昭君|网红王昭君减肥逆袭之路,一句“我要渣男后悔”,一年减掉306斤
- 鬼吹灯之南海归墟|即将播出!《鬼吹灯之南海归墟》传出新消息,粉丝却喜忧参半
- 人生之路|高中一毕业就参加NBA选秀?美媒列出10名最伟大的高中生球员
- TVB|TVB前知名女星罕见露面,在国外低调结婚,错爱有妇之夫致抑郁
