欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

hive窗口函数_Hive sql窗口函数源码分析

发布时间:2025/3/8 27 豆豆
生活随笔 收集整理的这篇文章主要介绍了 hive窗口函数_Hive sql窗口函数源码分析 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

在了解了窗口函数实现原理 spark、hive中窗口函数实现原理复盘 和 sparksql比hivesql优化的点(窗口函数)之后,今天又撸了一遍hive sql 中窗口函数的源码实现,写个笔记记录一下。

简单来说,窗口查询有两个步骤:将记录分割成多个分区;然后在各个分区上调用窗口函数。

传统的 UDAF 函数只能为每个分区返回一条记录,而我们需要的是不仅仅输入数据是一张表,输出数据也是一张表(table-in, table-out),因此 Hive 社区引入了分区表函数 Partitioned Table Function (PTF)。

1、代码流转图

PTF 运行在分区之上、能够处理分区中的记录并输出多行结果的函数。

hive会把QueryBlock,翻译为执行操作树OperatorTree,其中每个operator都会有三个重要的方法:

  • initializeOp()  --初始化算子

  • process()    --执行每一行数据

  • forward()   --把处理好的每一行数据发送到下个Operator

当遇到窗口函数时,会生成PTFOperator,PTFOperator 依赖PTFInvocation读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;

WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。

2、其它细节

PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)

void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把数据 append到 ptfpartition中,这里的partition与map-reduce中的分区不同,map-reduce分区是按照key的hash分,而这里是要把相同的key要放在同一个ptfpartition,方便后续的windowfunction操作 }}

真正对数据的操作是当相同的key完全放入同一个ptfpartition之后,时机就是finishPartition:

void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr = inputPart == null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart = inputPart == null ? null : tabFn.execute(inputPart); //这里TableFunctionEvaluator outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next != null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}

还有一个雷区,PTFPartition append():

public void append(Object o) throws HiveException { if ( elems.rowCount() == Integer.MAX_VALUE ) { //当一个ptfpartition加入的条数等于Integer.MAX_VALUE时会抛异常 throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition", Integer.MAX_VALUE)); } @SuppressWarnings("unchecked") List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}

需要把相同key的数据完全放入一个ptfPartition进行操作,这时对加入的的条数做了限制,不能>=Integer.MAX_VALUE(21亿),这块需要注意。

我是小萝卜算子

在成为最厉害最厉害最厉害的道路上

很高兴认识你

~~ enjoy ~~

总结

以上是生活随笔为你收集整理的hive窗口函数_Hive sql窗口函数源码分析的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。