日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

hive窗口函数_Hive sql窗口函数源码分析

發(fā)布時(shí)間:2025/3/8 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hive窗口函数_Hive sql窗口函数源码分析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在了解了窗口函數(shù)實(shí)現(xiàn)原理 spark、hive中窗口函數(shù)實(shí)現(xiàn)原理復(fù)盤(pán)?和?sparksql比hivesql優(yōu)化的點(diǎn)(窗口函數(shù))之后,今天又?jǐn)]了一遍hive sql 中窗口函數(shù)的源碼實(shí)現(xiàn),寫(xiě)個(gè)筆記記錄一下。

簡(jiǎn)單來(lái)說(shuō),窗口查詢有兩個(gè)步驟:將記錄分割成多個(gè)分區(qū);然后在各個(gè)分區(qū)上調(diào)用窗口函數(shù)。

傳統(tǒng)的 UDAF 函數(shù)只能為每個(gè)分區(qū)返回一條記錄,而我們需要的是不僅僅輸入數(shù)據(jù)是一張表,輸出數(shù)據(jù)也是一張表(table-in, table-out),因此 Hive 社區(qū)引入了分區(qū)表函數(shù) Partitioned Table Function (PTF)。

1、代碼流轉(zhuǎn)圖

PTF 運(yùn)行在分區(qū)之上、能夠處理分區(qū)中的記錄并輸出多行結(jié)果的函數(shù)。

hive會(huì)把QueryBlock,翻譯為執(zhí)行操作樹(shù)OperatorTree,其中每個(gè)operator都會(huì)有三個(gè)重要的方法:

  • initializeOp() ?--初始化算子

  • process() ? ?--執(zhí)行每一行數(shù)據(jù)

  • forward() ? --把處理好的每一行數(shù)據(jù)發(fā)送到下個(gè)Operator

當(dāng)遇到窗口函數(shù)時(shí),會(huì)生成PTFOperator,PTFOperator 依賴PTFInvocation讀取已經(jīng)排好序的數(shù)據(jù),創(chuàng)建相應(yīng)的輸入分區(qū):PTFPartition inputPart;

WindowTableFunction 負(fù)責(zé)管理窗口幀、調(diào)用窗口函數(shù)(UDAF)、并將結(jié)果寫(xiě)入輸出分區(qū): PTFPartition outputPart。

2、其它細(xì)節(jié)

PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)

void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); //主要操作就是把數(shù)據(jù) append到 ptfpartition中,這里的partition與map-reduce中的分區(qū)不同,map-reduce分區(qū)是按照key的hash分,而這里是要把相同的key要放在同一個(gè)ptfpartition,方便后續(xù)的windowfunction操作 }}

真正對(duì)數(shù)據(jù)的操作是當(dāng)相同的key完全放入同一個(gè)ptfpartition之后,時(shí)機(jī)就是finishPartition:

void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); } else { if ( tabFn.canIterateOutput() ) { outputPartRowsItr = inputPart == null ? null : tabFn.iterator(inputPart.iterator()); } else { outputPart = inputPart == null ? null : tabFn.execute(inputPart); //這里TableFunctionEvaluator outputPartRowsItr = outputPart == null ? null : outputPart.iterator(); } if ( next != null ) { if (!next.isStreaming() && !isOutputIterator() ) { next.inputPart = outputPart; } else { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { next.processRow(outputPartRowsItr.next()); } } } } } if ( next != null ) { next.finishPartition(); } else { if (!isStreaming() ) { if ( outputPartRowsItr != null ) { while(outputPartRowsItr.hasNext() ) { forward(outputPartRowsItr.next(), outputObjInspector); } } } }}

還有一個(gè)雷區(qū),PTFPartition append():

public void append(Object o) throws HiveException { if ( elems.rowCount() == Integer.MAX_VALUE ) { //當(dāng)一個(gè)ptfpartition加入的條數(shù)等于Integer.MAX_VALUE時(shí)會(huì)拋異常 throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition", Integer.MAX_VALUE)); } @SuppressWarnings("unchecked") List<Object> l = (List<Object>) ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE); elems.addRow(l);}

需要把相同key的數(shù)據(jù)完全放入一個(gè)ptfPartition進(jìn)行操作,這時(shí)對(duì)加入的的條數(shù)做了限制,不能>=Integer.MAX_VALUE(21億),這塊需要注意。

我是小蘿卜算子

在成為最厲害最厲害最厲害的道路上

很高興認(rèn)識(shí)你

~~ enjoy ~~

總結(jié)

以上是生活随笔為你收集整理的hive窗口函数_Hive sql窗口函数源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。