日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

map、mapPartitions、mapPartitionsWithIndex区别在哪里?

發布時間:2024/2/28 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 map、mapPartitions、mapPartitionsWithIndex区别在哪里? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

map?

/*** Return a new RDD by applying a function to all elements of this RDD. 通過對這個RDD的所有元素應用一個函數來返回一個新的RDD。*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

簡單來說:

map()會一條記錄為單位進行操作

val arr = Array("Tom","Bob","Tony","Jerry") //把 4 條數據分到兩個分區中 val rdd = sc.parallelize(arr,2) /* * 模擬把 RDD 中的元素寫入數據庫的過程 */ rdd.map(x => { println("創建數據庫連接...") println("寫入數據庫...") println("關閉數據庫連接...") println() }).count() 結果: 創建數據庫連接... 寫入數據庫... 關閉數據庫連接... 創建數據庫連接... 寫入數據庫... 關閉數據庫連接... 創建數據庫連接... 寫入數據庫... 關閉數據庫連接... 創建數據庫連接... 寫入數據庫... 關閉數據庫連接...

mapPartitions?

/*** Return a new RDD by applying a function to each partition of this RDD.** `preservesPartitioning` indicates whether the input function preserves the partitioner, which* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.通過對這個RDD的每個分區應用一個函數來返回一個新的RDD。 preservesPartitioning '指示輸入函數是否保存分區器,即保存分區器應該是'fasle',除非這是一對 RDD和輸入函數不修改鍵。*/def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)}

簡單來說:

mapPartitions 以分區為單位進行操作

/* * 將 RDD 中的數據寫入到數據庫中,絕大部分使用 mapPartitions 算子來實現 */ rdd.mapPartitions(x => { println("創建數據庫") val list = new ListBuffer[String]() while(x.hasNext){ //寫入數據庫 list += x.next()+":寫入數據庫" } //執行 SQL 語句 批量插入 list.iterator })foreach(println) 結果: 創建數據庫 Tom:寫入數據庫 Bob:寫入數據庫 創建數據庫 Tony:寫入數據庫 Jerry:寫入數據庫

mapPartitionsWithIndex

/*** Return a new RDD by applying a function to each partition of this RDD, while tracking the index* of the original partition.** `preservesPartitioning` indicates whether the input function preserves the partitioner, which* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.通過對這個RDD的每個分區應用一個函數來返回一個新的RDD,同時跟蹤索引 原分區的。preservesPartitioning '指示輸入函數是否保存分區器,該分區器應該為' false ',除非 這是一對RDD,且輸入函數不修改鍵。*/def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),preservesPartitioning)}

簡單來說:

將分區參數,附帶上。

val dataArr = Array("Tom01","Tom02","Tom03" ,"Tom04","Tom05","Tom06" ,"Tom07","Tom08","Tom09" ,"Tom10","Tom11","Tom12") val rdd = sc.parallelize(dataArr, 3); val result = rdd.mapPartitionsWithIndex((index,x) => { val list = ListBuffer[String]() while (x.hasNext) { list += "partition:"+ index + " content:" + x.next } list.iterator }) println("分區數量:" + result.partitions.size) val resultArr = result.collect() for(x <- resultArr){ println(x) } 結果: 分區數量:3 partition:0 content:Tom01 partition:0 content:Tom02 partition:0 content:Tom03 partition:0 content:Tom04 partition:1 content:Tom05 partition:1 content:Tom06 partition:1 content:Tom07 partition:1 content:Tom08 partition:2 content:Tom09 partition:2 content:Tom10 partition:2 content:Tom11 partition:2 content:Tom12

?

總結

以上是生活随笔為你收集整理的map、mapPartitions、mapPartitionsWithIndex区别在哪里?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。