欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 运维知识 > windows >内容正文

windows

flink如何设置以每天零点到第二天零点为区间的window进行计算

发布时间:2023/12/20 windows 51 豆豆
生活随笔 收集整理的这篇文章主要介绍了 flink如何设置以每天零点到第二天零点为区间的window进行计算 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

环境

flink1.11.2
JAVA
stream API

timewindow

背景

公司之前的指标是以分钟为单位的滚动窗口进行检查,然后在查询系统里查询的时候,对该天所有的分钟数据进行聚合统计。

 当前需要在flink中添加以天为单位的Job进行额外指标检查。指标出来之后和发现数据口径不一致,flink中默认是timeWindow按天进行滚动统计的数据是每天八点到第二天八点的数据

导致统计指标的含义对不上,没有参考意义和进行不同数据间的join。

解决方案

使用window配置自定义的窗口分隔TumblingEventTimeWindows对象(因为现在处理数据基本都使用的flink  eventTime作为数据时间进行处理,所以例子中需要数据流的时间用的是eventtime, 使用processtime的话可以使用TumblingProcessTimeWindows处理,讲道理应该配置都一样)

话不多说直接上代码吧。

默认情况8点->8点的时间统计的代码:

// 原始数据流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 进行数据清洗统计的逻辑 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").timeWindow(Time.days(1)) // 默认情况下 以天为单位的滚动窗口.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());

每天0点->0点的时间窗口统计代码(实际上可以举一反三搞出任意想要的时间的规则):

// 原始数据流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 进行数据清洗统计的逻辑 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) // 改改参数,就可以调整到自己想要的时间窗口统计规则.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());

结果

大家可以在操作windowFunction的时候打印一下apply方法参数中的TimeWindow对象的起止时间验证一下。我这边屡试不爽,问题解决了记录一下这个过程。

总结

以上是生活随笔为你收集整理的flink如何设置以每天零点到第二天零点为区间的window进行计算的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。