日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

hive窗口函数_Hive sql窗口函数源码分析

發布時間:2025/3/8 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hive窗口函数_Hive sql窗口函数源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在了解了窗口函數實現原理 spark、hive中窗口函數實現原理復盤?和?sparksql比hivesql優化的點(窗口函數)之后,今天又擼了一遍hive sql 中窗口函數的源碼實現,寫個筆記記錄一下。

簡單來說,窗口查詢有兩個步驟:將記錄分割成多個分區;然后在各個分區上調用窗口函數。

傳統的 UDAF 函數只能為每個分區返回一條記錄,而我們需要的是不僅僅輸入數據是一張表,輸出數據也是一張表(table-in, table-out),因此 Hive 社區引入了分區表函數 Partitioned Table Function (PTF)。

1、代碼流轉圖

PTF 運行在分區之上、能夠處理分區中的記錄并輸出多行結果的函數。

hive會把QueryBlock,翻譯為執行操作樹OperatorTree,其中每個operator都會有三個重要的方法:

  • initializeOp() ?--初始化算子

  • process() ? ?--執行每一行數據

  • forward() ? --把處理好的每一行數據發送到下個Operator

當遇到窗口函數時,會生成PTFOperator,PTFOperator 依賴PTFInvocation讀取已經排好序的數據,創建相應的輸入分區:PTFPartition inputPart;

WindowTableFunction 負責管理窗口幀、調用窗口函數(UDAF)、并將結果寫入輸出分區: PTFPartition outputPart。

2、其它細節

PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)

void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把數據 append到 ptfpartition中,這里的partition與map-reduce中的分區不同,map-reduce分區是按照key的hash分,而這里是要把相同的key要放在同一個ptfpartition,方便后續的windowfunction操作 }}

真正對數據的操作是當相同的key完全放入同一個ptfpartition之后,時機就是finishPartition:

void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr = inputPart == null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart = inputPart == null ? null : tabFn.execute(inputPart); //這里TableFunctionEvaluator outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next != null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}

還有一個雷區,PTFPartition append():

public void append(Object o) throws HiveException { if ( elems.rowCount() == Integer.MAX_VALUE ) { //當一個ptfpartition加入的條數等于Integer.MAX_VALUE時會拋異常 throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition", Integer.MAX_VALUE)); } @SuppressWarnings("unchecked") List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}

需要把相同key的數據完全放入一個ptfPartition進行操作,這時對加入的的條數做了限制,不能>=Integer.MAX_VALUE(21億),這塊需要注意。

我是小蘿卜算子

在成為最厲害最厲害最厲害的道路上

很高興認識你

~~ enjoy ~~

總結

以上是生活随笔為你收集整理的hive窗口函数_Hive sql窗口函数源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。