flink如何设置以每天零点到第二天零点为区间的window进行计算
生活随笔
收集整理的这篇文章主要介绍了
flink如何设置以每天零点到第二天零点为区间的window进行计算
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
环境
flink1.11.2
JAVA
stream API
timewindow
背景
公司之前的指标是以分钟为单位的滚动窗口进行检查,然后在查询系统里查询的时候,对该天所有的分钟数据进行聚合统计。
当前需要在flink中添加以天为单位的Job进行额外指标检查。指标出来之后和发现数据口径不一致,flink中默认是timeWindow按天进行滚动统计的数据是每天八点到第二天八点的数据。
导致统计指标的含义对不上,没有参考意义和进行不同数据间的join。
解决方案
使用window配置自定义的窗口分隔TumblingEventTimeWindows对象(因为现在处理数据基本都使用的flink eventTime作为数据时间进行处理,所以例子中需要数据流的时间用的是eventtime, 使用processtime的话可以使用TumblingProcessTimeWindows处理,讲道理应该配置都一样)
话不多说直接上代码吧。
默认情况8点->8点的时间统计的代码:
// 原始数据流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 进行数据清洗统计的逻辑 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").timeWindow(Time.days(1)) // 默认情况下 以天为单位的滚动窗口.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());每天0点->0点的时间窗口统计代码(实际上可以举一反三搞出任意想要的时间的规则):
// 原始数据流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 进行数据清洗统计的逻辑 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) // 改改参数,就可以调整到自己想要的时间窗口统计规则.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());结果
大家可以在操作windowFunction的时候打印一下apply方法参数中的TimeWindow对象的起止时间验证一下。我这边屡试不爽,问题解决了记录一下这个过程。
总结
以上是生活随笔为你收集整理的flink如何设置以每天零点到第二天零点为区间的window进行计算的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 最常用、最好用的vue服务端渲染框架
- 下一篇: 为精简版VS2008添加DEBUG版运行