日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Spark UDF变长参数的二三事儿

發(fā)布時間:2025/3/20 59 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark UDF变长参数的二三事儿 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在復雜業(yè)務邏輯中,我們經(jīng)常會用到Spark的UDF,當一個UDF需要傳入多列的內(nèi)容并進行處理時,UDF的傳參該怎么做呢? 下面通過變長參數(shù)引出,逐一介紹三種可行方法以及一些不可行的嘗試...

引子

變長參數(shù)對于我們來說并不陌生,在Java里我們這么寫

  • public?void?varArgs(String...?args)?
  • 在Scala里我們這么寫

  • def?varArgs(cols:?String*):?String?
  • 而在Spark里,很多時候我們有自己的業(yè)務邏輯,現(xiàn)成的functions滿足不了我們的需求,而當我們需要處理同一行的多個列,將其經(jīng)過我們自己的邏輯合并為一個列時,變長參數(shù)及其變種實現(xiàn)可以給我們提供幫助。

    但是在Spark UDF里我們是 無法使用變長參數(shù)傳值 的,但之所以本文以變長參數(shù)開頭,是因為需求起于它,而通過對它進行變換,我們可以使用變長參數(shù)或Seq類型來接收參數(shù)。

    下面通過Spark-Shell來做演示,以下三種方法都可以做到多列傳參,分別是

    • 變長參數(shù)(接受array類型)
    • Seq類型參數(shù)(接受array類型)
    • Row類型參數(shù)(接受struct類型)

    變長參數(shù)類型的UDF

    定義UDF方法

  • def?myConcatVarargs(sep:?String,?cols:?String*):?String?=?cols.filter(_?!=?null).mkString(sep)?
  • 注冊UDF函數(shù)

    由于變長參數(shù)只能通過方法定義,所以這里使用部分應用函數(shù)來轉(zhuǎn)換

  • val?myConcatVarargsUDF?=?udf(myConcatVarargs?_)?
  • 可以看到該UDF的定義如下

  • UserDefinedFunction(<function2>,StringType,List(StringType,?ArrayType(StringType,true)))?
  • 也即變長參數(shù)轉(zhuǎn)換為了ArrayType,而且函數(shù)是只包括兩個參數(shù),所以變長參數(shù)列表由此也可看出無法使用的。

    變長參數(shù)列表傳值

    我們構(gòu)造一個DataFrame如下

  • val?df?=?sc.parallelize(Array(("aa",?"bb",?"cc"),("dd","ee","ff"))).toDF("A",?"B",?"C")?
  • 然后直接傳入多個String類型的列到myConcatVarargsUDF

  • df.select(myConcatVarargsUDF(lit("-"),?col("A"),?col("B"),?col("C"))).show?
  • 結(jié)果出現(xiàn)如下報錯

  • java.lang.ClassCastException:?anonfun$1?cannot?be?cast?to?scala.Function4?
  • 由此可以看出,使用變長參數(shù)列表的方式Spark是不支持的,它會被識別為四個參數(shù)的函數(shù),而UDF確是被定義為兩個參數(shù)而不是四個參數(shù)的函數(shù)!

    變換:使用array()轉(zhuǎn)換做第二個參數(shù)

    我們使用Spark提供的array() function來轉(zhuǎn)換參數(shù)為Array類型

  • df.select(myConcatVarargsUDF(lit("-"),?array(col("A"),?col("B"),?col("C")))).show?
  • 結(jié)果如下

  • +-------------------+?
  • |UDF(-,array(A,B,C))|?
  • +-------------------+?
  • |???????????aa-bb-cc|?
  • |???????????dd-ee-ff|?
  • +-------------------+?
  • 由此可以看出,使用變長參數(shù)構(gòu)造的UDF方法,可以通過構(gòu)造Array的方式傳參,來達到多列合并的目的。

    使用Seq類型參數(shù)的UDF

    上面提到,變長參數(shù)最后被轉(zhuǎn)為ArrayType,那不禁要想我們?yōu)槁锊皇褂肁rray或List類型呢?

    實際上在UDF里,類型并不是我們可以隨意定義的,比如使用List和Array就是不行的,我們自己定義的類型也是不行的,因為這涉及到數(shù)據(jù)的序列化和反序列化。

    以Array/List為示例的錯誤

    下面以Array類型為示例

    定義函數(shù)

  • val?myConcatArray?=?(cols:?Array[String],?sep:?String)?=>?cols.filter(_?!=?null).mkString(sep)?
  • 注冊UDF

  • val?myConcatArrayUDF?=?udf(myConcatArray)?
  • 可以看到給出的UDF簽名是

  • UserDefinedFunction(<function2>,StringType,List())?
  • 應用UDF

  • df.select(myConcatArrayUDF(array(col("A"),?col("B"),?col("C")),?lit("-"))).show?
  • 會發(fā)現(xiàn)報錯

  • scala.collection.mutable.WrappedArray$ofRef?cannot?be?cast?to?[Ljava.lang.String?
  • 同樣List作為參數(shù)類型也會報錯,因為反序列化的時候無法構(gòu)建對象,所以List和Array是無法直接作為UDF的參數(shù)類型的

    以Seq做參數(shù)類型

    定義調(diào)用如下

  • val?myConcatSeq?=?(cols:?Seq[Any],?sep:?String)?=>?cols.filter(_?!=?null).mkString(sep)??
  • val?myConcatSeqUDF?=?udf(myConcatSeq)??
  • df.select(myConcatSeqUDF(array(col("A"),?col("B"),?col("C")),?lit("-"))).show?
  • 結(jié)果如下

  • +-------------------+?
  • |UDF(array(A,B,C),-)|?
  • +-------------------+?
  • |???????????aa-bb-cc|?
  • |???????????dd-ee-ff|?
  • +-------------------+?
  • 使用Row類型參數(shù)的UDF

    我們可以使用Spark functions里struct方法構(gòu)造結(jié)構(gòu)體類型傳參,然后用Row類型接UDF的參數(shù),以達到多列傳值的目的。

  • def?myConcatRow:?((Row,?String)?=>?String)?=?(row,?sep)?=>?row.toSeq.filter(_?!=?null).mkString(sep)??
  • val?myConcatRowUDF?=?udf(myConcatRow)??
  • df.select(myConcatRowUDF(struct(col("A"),?col("B"),?col("C")),?lit("-"))).show?
  • 可以看到UDF的簽名如下

  • UserDefinedFunction(<function2>,StringType,List())?
  • 結(jié)果如下

  • +--------------------+?
  • |UDF(struct(A,B,C),-)|?
  • +--------------------+?
  • |????????????aa-bb-cc|?
  • |????????????dd-ee-ff|?
  • +--------------------+?
  • 使用Row類型還可以使用模式提取,用起來會更方便

  • row?match?{?
  • ??case?Row(aa:String,?bb:Int)?=>?
  • }?
  • 最后

    對于上面三種方法,變長參數(shù)和Seq類型參數(shù)都需要array的函數(shù)包裝為ArrayType,而使用Row類型的話,則需要struct函數(shù)構(gòu)建結(jié)構(gòu)體類型,其實都是為了數(shù)據(jù)的序列化和反序列化。三種方法中,Row的方式更靈活可靠,而且支持不同類型并且可以明確使用模式提取,用起來相當方便。

    而由此我們也可以看出,UDF不支持List和Array類型的參數(shù),同時 自定義參數(shù)類型 如果沒有混合Spark的特質(zhì)實現(xiàn)序列化和反序列化,那么在UDF里也是 無法用作參數(shù)類型 的。當然,Seq類型是可以 的,可以接多列的數(shù)組傳值。

    此外,我們也可以使用柯里化來達到多列傳參的目的,只是不同參數(shù)個數(shù)需要定義不同的UDF了。 ?


    本文作者:佚名

    來源:51CTO

    總結(jié)

    以上是生活随笔為你收集整理的Spark UDF变长参数的二三事儿的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。