hive窗口函数_Hive sql窗口函数源码分析
在了解了窗口函數(shù)實(shí)現(xiàn)原理 spark、hive中窗口函數(shù)實(shí)現(xiàn)原理復(fù)盤(pán)?和?sparksql比hivesql優(yōu)化的點(diǎn)(窗口函數(shù))之后,今天又?jǐn)]了一遍hive sql 中窗口函數(shù)的源碼實(shí)現(xiàn),寫(xiě)個(gè)筆記記錄一下。
簡(jiǎn)單來(lái)說(shuō),窗口查詢有兩個(gè)步驟:將記錄分割成多個(gè)分區(qū);然后在各個(gè)分區(qū)上調(diào)用窗口函數(shù)。
傳統(tǒng)的 UDAF 函數(shù)只能為每個(gè)分區(qū)返回一條記錄,而我們需要的是不僅僅輸入數(shù)據(jù)是一張表,輸出數(shù)據(jù)也是一張表(table-in, table-out),因此 Hive 社區(qū)引入了分區(qū)表函數(shù) Partitioned Table Function (PTF)。
1、代碼流轉(zhuǎn)圖
PTF 運(yùn)行在分區(qū)之上、能夠處理分區(qū)中的記錄并輸出多行結(jié)果的函數(shù)。
hive會(huì)把QueryBlock,翻譯為執(zhí)行操作樹(shù)OperatorTree,其中每個(gè)operator都會(huì)有三個(gè)重要的方法:
initializeOp() ?--初始化算子
process() ? ?--執(zhí)行每一行數(shù)據(jù)
forward() ? --把處理好的每一行數(shù)據(jù)發(fā)送到下個(gè)Operator
當(dāng)遇到窗口函數(shù)時(shí),會(huì)生成PTFOperator,PTFOperator 依賴PTFInvocation讀取已經(jīng)排好序的數(shù)據(jù),創(chuàng)建相應(yīng)的輸入分區(qū):PTFPartition inputPart;
WindowTableFunction 負(fù)責(zé)管理窗口幀、調(diào)用窗口函數(shù)(UDAF)、并將結(jié)果寫(xiě)入輸出分區(qū): PTFPartition outputPart。
2、其它細(xì)節(jié)
PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)
void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把數(shù)據(jù) append到 ptfpartition中,這里的partition與map-reduce中的分區(qū)不同,map-reduce分區(qū)是按照key的hash分,而這里是要把相同的key要放在同一個(gè)ptfpartition,方便后續(xù)的windowfunction操作 }}真正對(duì)數(shù)據(jù)的操作是當(dāng)相同的key完全放入同一個(gè)ptfpartition之后,時(shí)機(jī)就是finishPartition:
void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr = inputPart == null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart = inputPart == null ? null : tabFn.execute(inputPart); //這里TableFunctionEvaluator outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next != null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}還有一個(gè)雷區(qū),PTFPartition append():
public void append(Object o) throws HiveException { if ( elems.rowCount() == Integer.MAX_VALUE ) { //當(dāng)一個(gè)ptfpartition加入的條數(shù)等于Integer.MAX_VALUE時(shí)會(huì)拋異常 throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition", Integer.MAX_VALUE)); } @SuppressWarnings("unchecked") List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}需要把相同key的數(shù)據(jù)完全放入一個(gè)ptfPartition進(jìn)行操作,這時(shí)對(duì)加入的的條數(shù)做了限制,不能>=Integer.MAX_VALUE(21億),這塊需要注意。
我是小蘿卜算子 在成為最厲害最厲害最厲害的道路上 很高興認(rèn)識(shí)你 ~~ enjoy ~~ |
總結(jié)
以上是生活随笔為你收集整理的hive窗口函数_Hive sql窗口函数源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python装好了怎么启动车_【填空题】
- 下一篇: 什么理财产品最赚钱 先问问自己能承