日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _27_自定义函数UDF和UDAF

發布時間:2024/2/28 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _27_自定义函数UDF和UDAF 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

UDF:用戶自定義函數。

可以自定義類實現UDFX接口。

javaAPI:

package com.udf;import javafx.scene.chart.PieChart; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays; import java.util.List;/*** @author George* @description**/ public class Udf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("udf");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd = sc.parallelize(Arrays.asList("George","GeorgeDage","kangkang"));JavaRDD<Row> map = rdd.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> df = sqlContext.createDataFrame(map, schema);df.show();/*** +----------+* | name|* +----------+* | George|* |GeorgeDage|* | kangkang|* +----------+*/df.registerTempTable("user");/*** 根據UDF函數參數的個數來決定是實現哪一個UDF UDF1,UDF2。。。。UDF1xxx*/sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {public Integer call(String s) throws Exception {return s.length();}},DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 6|* |GeorgeDage| 10|* | kangkang| 8|* +----------+------+*/sqlContext.udf().register("StrLen", new UDF2<String, Integer, Integer>() {public Integer call(String s, Integer integer) throws Exception {return s.length()+integer;}}, DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name,10) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 16|* |GeorgeDage| 20|* | kangkang| 18|* +----------+------+*/sc.stop();} }

scalaAPI:

package com.udfimport org.apache.spark.sql.SparkSession/*** UDF用戶自定義函數*/ object UdfScalaDemo {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local").appName("udf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk")import sparkSession.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")frame.show()/*** +------+* | name|* +------+* |George|* | lucy|* | kk|* | lmdhk|* +------+*/sparkSession.udf.register("STRLEN",(n:String)=>{n.length})sparkSession.sql("select name,STRLEN(name) as length from students sort by length desc").show(100)/*** +------+------+* | name|length|* +------+------+* |George| 6|* | lmdhk| 5|* | lucy| 4|* | kk| 2|* +------+------+*/sparkSession.stop()} }

?

UDAF:用戶自定義聚合函數。

  • 實現UDAF函數如果要自定義類要繼承UserDefinedAggregateFunction類

javaAPI:

package com.udf;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays;/*** @author George* @description*用戶自定義聚合函數。*實現UDAF函數如果要自定義類要繼承UserDefinedAggregateFunction類**/ public class Udaf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("udaf");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("George", "kangkang", "GeorgeDage", "limu","George","GeorgeDage"));JavaRDD<Row> map = parallelize.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});ArrayList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> frame = sqlContext.createDataFrame(map, schema);frame.show();/*** +----------+* | name|* +----------+* | George|* | kangkang|* |GeorgeDage|* | limu|* +----------+*/frame.registerTempTable("user");/*** 注冊一個UDAF函數,實現統計相同值得個數* 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的*/sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** 指定輸入字段的字段及類型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType, true)));}@Overridepublic DataType dataType() {return DataTypes.IntegerType;}@Overridepublic boolean deterministic() {return true;}/*** 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯* buffer.getInt(0)獲取的是上一次聚合后的值* 相當于map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合* 大聚和發生在reduce端.* 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0)+1);}@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}/*** 合并 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理* 此時就要用merge操作,將各個節點上分布式拼接好的串,合并起來* buffer1.getInt(0) : 大聚和的時候 上一次聚合后的值* buffer2.getInt(0) : 這次計算傳入進來的update的結果* 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果*/@Overridepublic Object evaluate(Row buffer) {return buffer.getInt(0);}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();/*** +----------+------+* | name|(name)|* +----------+------+* | limu| 1|* | George| 2|* |GeorgeDage| 2|* | kangkang| 1|* +----------+------+*/sc.stop();} }

scalaAPI:

package com.udfimport org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{// 聚合操作時,所處理的數據的類型def bufferSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("aaa",IntegerType, true)))}// 最終函數返回值的類型def dataType: DataType = {DataTypes.IntegerType}def deterministic: Boolean = {true}// 最后返回一個最終的聚合值 要和dataType的類型一一對應def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}// 為每個分組的數據執行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0}//輸入數據的類型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))}// 最后merger的時候,在各個節點上的聚合值,要進行merge,也就是合并def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)}// 每個組,有新的值進來的時候,進行分組對應的聚合值的計算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0)+1} }

?

package com.udfimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedAggregateFunctionobject UdafScalaDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local").appName("udaf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk","kk")import session.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")/*** 注冊UDAF函數*/session.udf.register("NAMECOUNT",new MyUDAF())session.sql("select name,NAMECOUNT(name) as count from students group by name").show(100)/*** +------+-----+* | name|count|* +------+-----+* | lucy| 1|* | kk| 2|* |George| 1|* | lmdhk| 1|* +------+-----+*/session.stop()} }

圖解UDAF:

總結

以上是生活随笔為你收集整理的Spark _27_自定义函数UDF和UDAF的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。