做网站百度新闻源多店铺商城系统开发
做网站百度新闻源,多店铺商城系统开发,长沙科技网站设计哪家专业,怎么优化自己的网站在了解了窗口函数实现原理 spark、hive中窗口函数实现原理复盘 和 sparksql比hivesql优化的点(窗口函数)之后#xff0c;今天又撸了一遍hive sql 中窗口函数的源码实现#xff0c;写个笔记记录一下。简单来说#xff0c;窗口查询有两个步骤#xff1a;将记录分割成多个分区… 在了解了窗口函数实现原理 spark、hive中窗口函数实现原理复盘 和 sparksql比hivesql优化的点(窗口函数)之后今天又撸了一遍hive sql 中窗口函数的源码实现写个笔记记录一下。简单来说窗口查询有两个步骤将记录分割成多个分区然后在各个分区上调用窗口函数。传统的 UDAF 函数只能为每个分区返回一条记录而我们需要的是不仅仅输入数据是一张表输出数据也是一张表(table-in, table-out)因此 Hive 社区引入了分区表函数 Partitioned Table Function (PTF)。1、代码流转图PTF 运行在分区之上、能够处理分区中的记录并输出多行结果的函数。hive会把QueryBlock翻译为执行操作树OperatorTree其中每个operator都会有三个重要的方法initializeOp() --初始化算子process() --执行每一行数据forward() --把处理好的每一行数据发送到下个Operator当遇到窗口函数时会生成PTFOperatorPTFOperator 依赖PTFInvocation读取已经排好序的数据创建相应的输入分区PTFPartition inputPart;WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。2、其它细节PTFOperator.process(Object row, int tag)--PTFInvocation.processRow(row)void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把数据 append到 ptfpartition中这里的partition与map-reduce中的分区不同map-reduce分区是按照key的hash分而这里是要把相同的key要放在同一个ptfpartition方便后续的windowfunction操作 }}真正对数据的操作是当相同的key完全放入同一个ptfpartition之后时机就是finishPartitionvoid finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr inputPart null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart inputPart null ? null : tabFn.execute(inputPart); //这里TableFunctionEvaluator outputPartRowsItr outputPart null ? null : outputPart.iterator(); } if ( next ! null ) { if (!next.isStreaming() !isOutputIterator() ) { next.inputPart outputPart; } else { if ( outputPartRowsItr ! null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next ! null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr ! null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}还有一个雷区PTFPartition append()public void append(Object o) throws HiveException { if ( elems.rowCount() Integer.MAX_VALUE ) { //当一个ptfpartition加入的条数等于Integer.MAX_VALUE时会抛异常 throw new HiveException(String.format(Cannot add more than %d elements to a PTFPartition, Integer.MAX_VALUE)); } SuppressWarnings(unchecked) ListObject l (ListObject) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}需要把相同key的数据完全放入一个ptfPartition进行操作这时对加入的的条数做了限制不能Integer.MAX_VALUE(21亿)这块需要注意。我是小萝卜算子在成为最厉害最厉害最厉害的道路上很高兴认识你~~ enjoy ~~
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/87584.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!