日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

【大数据】Presto开发自定义聚合函数

發布時間:2024/1/18 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【大数据】Presto开发自定义聚合函数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Presto 在交互式查詢任務中擔當著重要的職責。隨著越來越多的人開始使用 SQL 在 Presto 上分析數據,我們發現需要將一些業務邏輯開發成類似 Hive 中的 UDF,提高 SQL 使用人員的效率,同時也保證 Hive 和 Presto 環境中的 UDF 統一。

1、Presto函數介紹

在此之前先簡單介紹下UDF和UDAF,UDF叫做用戶自定義函數,而UDAF叫做用戶自定義聚合函數,區別就在于UDF不會保存狀態,一行輸入一行輸出,而UDAF會涉及到狀態的保存,通過聚合多個節點的數據來轉換為最終的輸出結果。

在 Presto 中,函數大體分為三種:scalar,aggregation 和 window 類型。分別如下:

1)scalar標量函數,簡單來說就是 Java 中的一個靜態方法,本身沒有任何狀態(不保存數據,一行輸入一行輸出)。

2)aggregation累積狀態的函數,或聚集函數,如count,avg。如果只是單節點,單機狀態可以直接用一個變量存儲即可,但是presto是分布式計算引擎,狀態數據會在多個節點之間傳輸,因此狀態數據需要被序列化成 Presto 的內部格式才可以被傳輸。簡單來說Aggregation對應于多行輸入一行輸出。

3)window 窗口函數,窗口函數在查詢結果的行上進行計算,執行順序在HAVING子句之后,ORDER BY子句之前。在 Presto SQL 中,窗口函數的語法形式如下:

windowFunction(arg1,....argn) OVER([PARTITION BY<...>] [ORDER BY<...>] [RANGE|ROWS BETWEEN AND])

窗口函數語法由關鍵字OVER觸發,且包含三個子句:
PARTITION BY: 指定輸入行分區的規則,類似于聚合函數的GROUP BY子句,不同分區里的計算互不干擾(窗口函數的計算是并發進行的,并發數和partition數量一致),缺省時將所有數據行視為一個分區
ORDER BY: 決定了窗口函數處理輸入行的順序
RANGE|ROWS BETWEEN AND: 指定窗口邊界,不常用,缺省時的窗口為當前行所在的分區第一行到當前行。

2、自定義函數

官方文檔地址:https://prestodb.io/docs/current/develop/functions.html

2.1自定義Scalar函數的實現

2.1.1定義一個java類
1)用 @ScalarFunction 的 Annotation 標記實現業務邏輯的靜態方法。

2)用 @Description 描述函數的作用,這里的內容會在 SHOW FUNCTIONS 中顯示。

3)用@SqlType 標記函數的返回值類型,如返回字符串,因此是 StandardTypes.VARCHAR。

4)Java 方法的返回值必須使用 Presto 內部的序列化方式,因此字符串類型必須返回 Slice, 使用 Slices.utf8Slice 方法可以方便的將 String 類型轉換成 Slice 類型

public class ExampleStringFunction {@ScalarFunction("lowercaser")@Description("converts the string to alternating case")@SqlType(StandardTypes.VARCHAR)public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice){String argument = slice.toStringUtf8();return Slices.utf8Slice(argument.toLowerCase());} }

2.2 自定義Aggregation函數

2.2.1實現原理步驟
Presto 把 Aggregation 函數分解成三個步驟執行:

1、input(state, data): 針對每條數據,執行 input 函數。這個過程是并行執行的,因此在每個有數據的節點都會執行,最終得到多個累積的狀態數據。

2、combine(state1, state2):將所有節點的狀態數據聚合起來,多次執行,直至所有狀態數據被聚合成一個最終狀態,也就是 Aggregation 函數的輸出結果。

3、output(final_state, out):最終輸出結果到一個 BlockBuilder。

2.2.2 具體代碼實現過程
1、定義一個 Java 類,使用 @AggregationFunction 標記為 Aggregation 函數

2、使用 @InputFunction、 @CombineFunction、@OutputFunction 分別標記計算函數、合并結果函數和最終輸出函數在 Plugin 處注冊 Aggregation 函數

3、一個繼承AccumulatorState的State接口,get和set方法

4、并使用 @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 類信息(stateFactoryClass指定)。自己寫一個序列化類和一個工廠類。(復雜數據類型需要:自定義類保存狀態、Map、List等)

簡單類型Aggregation

對于簡單數據類型的聚合函數編寫比較簡單,實現一個包含input、combine、output的aggregation和一個狀態設定接口State提供get、set方法即可,不用去關心序列化和狀態保存問題。
Aggregation:

@AggregationFunction("avg") public final class IntervalYearToMonthAverageAggregation {private IntervalYearToMonthAverageAggregation() {}@InputFunctionpublic static void input(LongAndDoubleState state, @SqlType(StandardTypes.INTERVAL_YEAR_TO_MONTH) long value){state.setLong(state.getLong() + 1);state.setDouble(state.getDouble() + value);}@CombineFunctionpublic static void combine(LongAndDoubleState state, LongAndDoubleState otherState){state.setLong(state.getLong() + otherState.getLong());state.setDouble(state.getDouble() + otherState.getDouble());}@OutputFunction(StandardTypes.INTERVAL_YEAR_TO_MONTH)public static void output(LongAndDoubleState state, BlockBuilder out){long count = state.getLong();if (count == 0) {out.appendNull();}else {double value = state.getDouble();INTERVAL_YEAR_MONTH.writeLong(out, round(value / count));}} }

LongAndDoubleState :寫一個接口實現繼承自AccumulatorState類,提供get、set方法即可。

public interface LongAndDoubleStateextends AccumulatorState {long getLong();void setLong(long value);double getDouble();void setDouble(double value); }

復雜類型Aggregation

對于復雜數據類型則需要提供序列化機制,你要序列化那些東西都是由你來制指定的。在AccumulatorState的接口上用注解指定@AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 類信息(stateFactoryClass指定),自定義一個序列化器和序列化工廠類,實現類的序列化和反序列化。

Aggregation類: 這個類實現比較簡單,和簡單數據類型的實現一樣,input、combine、output。

@AggregationFunction("presto_collect") public class CollectListAggregation {@InputFunctionpublic static void input(@AggregationState CollectState state, @SqlType(StandardTypes.VARCHAR) Slice id,@SqlType(StandardTypes.VARCHAR) Slice key) {try {CollectListStats stats = state.get();if (stats == null) {stats = new CollectListStats();state.set(stats);}int inputId = Integer.parseInt(id.toStringUtf8());String inputKey = key.toStringUtf8();stats.addCollectList(inputId,inputKey, 1);} catch (Exception e) {throw new RuntimeException(e+" --------- input err");}}@CombineFunctionpublic static void combine(@AggregationState CollectState state, CollectState otherState) {try {CollectListStats collectListStats = state.get();CollectListStats oCollectListStats = otherState.get();if(collectListStats == null) {state.set(oCollectListStats);} else {collectListStats.mergeWith(oCollectListStats);}}catch (Exception e) {throw new RuntimeException(e+" --------- combine err");}}@OutputFunction(StandardTypes.*VARCHAR*)public static void output(@AggregationState CollectState state, BlockBuilder out) {try {CollectListStats stats = state.get();if (stats == null) {out.appendNull();return;}// 統計Slice result = stats.getCollectResult();if (result == null) {out.appendNull();} else {VarcharType.VARCHAR.writeSlice(out, result);}} catch (Exception e) {throw new RuntimeException(e+" -------- output err");}} }

狀態保存接口:

@AccumulatorStateMetadata(stateSerializerClass = CollectListStatsSerializer.class, stateFactoryClass = CollectListStatsFactory.class) public interface CollectState extends AccumulatorState {CollectListStats get();void set(CollectListStats value); }

存放數據的類:此類需要實現數據的序列化和反序列化,這是最關鍵和比較麻煩的地方,貼一個例子,關鍵在于需要自己控制存儲空間以及數據的順序,和讀取的時候按照一定順序讀取。對于字符要先存儲長度,然后是字節,讀取則先讀取字符長度,然后讀取這么長的數據,最后轉化為字符。

public class CollectListStats {private static final int INSTANCE_SIZE = ClassLayout.parseClass(CollectListStats.class).instanceSize();//<id,<key,value>>private Map<Integer,Map<String,Integer>> collectContainer = new HashMap<>();private long contentEstimatedSize = 0;private int keyByteLen = 0;private int keyListLen = 0;CollectListStats() {}CollectListStats(Slice serialized) {deserialize(serialized);}void addCollectList(Integer id, String key, int value) {if (collectContainer.containsKey(id)) {Map<String, Integer> tmpMap = collectContainer.get(id);if (tmpMap.containsKey(key)) {tmpMap.put(key, tmpMap.get(key)+value);}else{tmpMap.put(key,value);contentEstimatedSize += ( key.getBytes().length + SizeOf.SIZE_OF_INT*);keyByteLen += key.getBytes().length;keyListLen++;}} else {Map<String,Integer> tmpMap = new HashMap<String,Integer>();tmpMap.put(key, value);keyByteLen += key.getBytes().length;keyListLen++;collectContainer.put(id, tmpMap);contentEstimatedSize += SizeOf.SIZE_OF_INT;}}//[{id:1,{"aaa":3,"fadf":6},{}]Slice getCollectResult() {Slice jsonSlice = null;try {StringBuilder jsonStr = new StringBuilder();jsonStr.append("[");int collectLength = collectContainer.entrySet().size();for (Map.Entry<Integer, Map<String, Integer>> mapEntry : collectContainer.entrySet()) {Integer id = mapEntry.getKey();Map<String, Integer> vMap = mapEntry.getValue();jsonStr.append("{id:").append(id).append(",{");int vLength = vMap.entrySet().size();for (Map.Entry<String, Integer> vEntry : vMap.entrySet()) {String key = vEntry.getKey();Integer value = vEntry.getValue();jsonStr.append(key).append(":").append(value);vLength--;if (vLength != 0) {jsonStr.append(",");}}jsonStr.append("}");collectLength--;if (collectLength != 0) {jsonStr.append(",");}}jsonStr.append("]");jsonSlice = Slices.utf8Slice*(jsonStr.toString());} catch (Exception e) {throw new RuntimeException(e+" ---------- get CollectResult err");}return jsonSlice;}public void deserialize(Slice serialized) {try {SliceInput input = serialized.getInput();//外層map的長度int collectStatsEntrySize = input.readInt();for (int collectCnt = 0; collectCnt < collectStatsEntrySize; collectCnt++) {int id = input.readInt();int keyEntrySize = input.readInt();for (int idCnt = 0; idCnt < keyEntrySize; idCnt++) {int keyBytesLen = input.readInt();byte[] keyBytes = new byte[keyBytesLen];for (int byteIdx = 0; byteIdx < keyBytesLen; byteIdx++) {keyBytes[byteIdx] = input.readByte();}String key = new String(keyBytes);int value = input.readInt();addCollectList(id, key, value);}}} catch (Exception e) {throw new RuntimeException(e+" ----- deserialize err");}}public Slice serialize() {SliceOutput builder = null;int requiredBytes = //對應 SliceOutput builder append的內容所占用的空間SizeOf.SIZE_OF_INT*3 //id entry數目,id數值,key Entry數目\+ keyListLen * SizeOf.SIZE_OF_INT* //key bytes長度\+ keyByteLen* //key byte總長度\+ keyListLen * SizeOf.SIZE_OF_INT; //valuetry {// 序列化builder = Slices.*allocate*(requiredBytes).getOutput();for (Map.Entry<Integer,Map<String, Integer>> entry : collectContainer.entrySet()) {//id個數builder.appendInt(collectContainer.entrySet().size());//id 數值builder.appendInt(entry.getKey());Map<String, Integer> kMap = entry.getValue();builder.appendInt(kMap.entrySet().size());for (Map.Entry<String, Integer> vEntry : kMap.entrySet()) {byte[] keyBytes = vEntry.getKey().getBytes();builder.appendInt(keyBytes.length);builder.appendBytes(keyBytes);builder.appendInt(vEntry.getValue());}}return builder.getUnderlyingSlice();} catch (Exception e) {throw new RuntimeException(e+" ---- serialize err requiredBytes = " + requiredBytes + " keyByteLen= " + keyByteLen + " keyListLen = " + keyListLen);}}long estimatedInMemorySize() {return INSTANCE_SIZE + contentEstimatedSize;}void mergeWith(CollectListStats other) {if (other == null) {return;}for (Map.Entry<Integer,Map<String, Integer>> cEntry : other.collectContainer.entrySet()) {Integer id = cEntry.getKey();Map<String, Integer> kMap = cEntry.getValue();for (Map.Entry<String, Integer> kEntry : kMap.entrySet()) {addCollectList(id, kEntry.getKey(), kEntry.getValue());}}} }

上面的例子我是直接從別人那兒拿過來的(個人比較懶:https://www.cnblogs.com/lrxvx/p/12558902.html),實際方式也很簡單,就是實現序列化和反序列化方法以及一個管理存儲空間的方法。需要注意的是序列化和反序列化時候的順序一定要保證,Presto提供了許多屬性方式的選項如int、long、byte,對于String方式序列化,可以將String轉為byte再進行序列化,思路就是先序列化一個長度進去,再將字節內容序列化,反序列化的時候先讀length,再讀相應的字節內容轉為String就好了,而對象類型的屬性,本質上還是可以直接序列化屬性,反序列化時候重新創建對象,內容沒變。Presto的序列化方式比較高效,原因是因為我可以只序列化我想要的屬性就好了,缺點是擴展性不足。

序列化類:

public class CollectListStatsSerializer implements AccumulatorStateSerializer<CollectState> {@Overridepublic Type getSerializedType() {return VARBINARY;}@Overridepublic void serialize(CollectState state, BlockBuilder out) {if (state.get() == null) {out.appendNull();} else {VARBINARY.writeSlice(out, state.get().serialize());}}@Overridepublic void deserialize(Block block, int index, CollectState state) {state.set(new CollectListStats(VARBINARY.getSlice(block, index)));} }

序列化工廠類:

public class CollectListStatsFactory implements AccumulatorStateFactory<CollectState> {@Overridepublic CollectState createSingleState() {return new SingleState();}@Overridepublic Class<? extends CollectState> getSingleStateClass() {return SingleState.class;}@Overridepublic CollectState createGroupedState() {return new GroupState();}@Overridepublic Class<? extends CollectState> getGroupedStateClass() {return GroupState.class;}public static class GroupState implements GroupedAccumulatorState, CollectState {private static final int INSTANCE_SIZE = ClassLayout.parseClass(GroupedDigestAndPercentileState.class).instanceSize();private final ObjectBigArray<CollectListStats> collectStatsList = new ObjectBigArray<>();private long size;private long groupId;@Overridepublic void setGroupId(long groupId) {this.groupId = groupId;}@Overridepublic void ensureCapacity(long size) {collectStatsList.ensureCapacity(size);}@Overridepublic CollectListStats get() {return collectStatsList.get(groupId);}@Overridepublic void set(CollectListStats value) {CollectListStats previous = get();if (previous != null) {size -= previous.estimatedInMemorySize();}collectStatsList.set(groupId, value);size += value.estimatedInMemorySize();}@Overridepublic long getEstimatedSize() {return INSTANCE_SIZE +size + collectStatsList.sizeOf();}}public static class SingleState implements CollectState{private CollectListStats stats;@Overridepublic CollectListStats get() {return stats;}@Overridepublic void set(CollectListStats value) {stats = value;}@Overridepublic long getEstimatedSize() {if (stats == null) {return 0;}return stats.estimatedInMemorySize();}} }

驗證自定義函數

當我們開發好自定義函數后如何驗證呢,一種方式是使用Presto內置函數注冊機制進行單元測試,Presto 函數由MetadataManager中的FunctionRegistry進行管理,開發的函數要生效必須要先注冊到FunctionRegistry中。函數注冊是在 Presto 服務啟動過程中進行的,有以下兩種方式進行函數注冊。

FunctionListBuilder builder = new FunctionListBuilder().window(RowNumberFunction.class).aggregate(ApproximateCountDistinctAggregation.class).scalar(RepeatFunction.class).function(MAP_HASH_CODE)......

注冊好之后就可以編寫相應的單元測試代碼了。完整的Aggregation測試代碼如下:

import com.facebook.presto.common.type.Type; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.FunctionListBuilder; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.operator.aggregation.InternalAggregationFunction; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test;import static com.facebook.presto.block.BlockAssertions.createSlicesBlock; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.operator.aggregation.AggregationTestUtils.assertAggregation; import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;public class TestAggregation{private static final FunctionAndTypeManager FUNCTION_AND_TYPE_MANAGER = MetadataManager.createTestMetadataManager().getFunctionAndTypeManager();private static InternalAggregationFunction getAggregation(Type... arguments){return FUNCTION_AND_TYPE_MANAGER.getAggregateFunctionImplementation(FUNCTION_AND_TYPE_MANAGER.lookupFunction("presto_collect", fromTypes(arguments)));}private static final InternalAggregationFunction COLLECTION_AGGREGATION = getAggregation(VARCHAR, VARCHAR); //和Aggregation中的類型對應,java類型的Slice對應Varchar@BeforeClasspublic void init(){FunctionListBuilder builder = new FunctionListBuilder().aggregate(CollectListAggregation.class);FUNCTION_AND_TYPE_MANAGER.registerBuiltInFunctions(builder.getFunctions());}@Testpublic void collectionAggregationTest(){String result="xxx"; //你期望的aggregation結果Slice str1= Slices.utf8Slice("x");Slice str2= Slices.utf8Slice("y");assertAggregation(COLLECTION_AGGREGATION,result,createSlicesBlock(str1, str2),createSlicesBlock(str1, str2));} }

標量函數單元測試

而對于標量函數scalar的測試略有不同,示例如下:

public class TestBitwiseFunctionsextends AbstractTestFunctions {@Testpublic void testBitCount(){assertFunction("bit_count(0, 64)", BIGINT, 0L); //bit_count為標量函數名,傳參,參數如果為String則用單引號,參數類型,期望結果} }

當然進行單元測試后,我們期望到真實的庫中去驗證,內置函數滿足不了使用需求時,就需要自行開發函數來拓展函數庫。開發者自行編寫的拓展函數一般通過插件的方式進行注冊。PluginManager在安裝插件時會調用插件的getFunctions()方法,將獲取到的函數集合通過MetadataManager的addFunctions方法進行注冊:

public class ExampleFunctionsPluginimplements Plugin {@Overridepublic Set<Class<?>> getFunctions(){return ImmutableSet.<Class<?>>builder().add(ExampleNullFunction.class).add(IsNullFunction.class).add(IsEqualOrNullFunction.class).add(ExampleStringFunction.class).add(ExampleAverageFunction.class).build();} }

Presto 函數的注冊機制,新增和修改函數后,必須要重啟服務才能生效,所以目前還不支持真正的用戶自定義函數。插件函數進行注冊之后,在resource下創建META-INF/services目錄,并創建文件名為com.facebook.presto.spi.Plugin的文件,并添加內容:

xxx.xxx.xxx.ExampleFunctionsPlugin

然后利用presto的插件打包,此時會在target目錄下生成zip文件,把xxx.zip解壓到${PRESTOHOME}/plugin,重啟presto服務即可進行驗證。

總的來說,Presto的UDF和UDAF開發總結為一張圖:

注意:各個版本的Presto源碼有所不同,遇到類不正確的對版本進行調整,上面是用的Presto版本為0.264,更多的參考Presto的官方源碼https://github.com/prestodb/presto,而對于Persto的分組聚合查詢流程可以參見:Presto中的分組聚合查詢流程

總結

以上是生活随笔為你收集整理的【大数据】Presto开发自定义聚合函数的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。