【Apache Spark 】第 6 章Spark SQL 和数据集
?🔎大家好,我是Sonhhxg_柒,希望你看完之后,能對你有所幫助,不足請指正!共同學習交流🔎
📝個人主頁-Sonhhxg_柒的博客_CSDN博客?📃
🎁歡迎各位→點贊👍 + 收藏?? + 留言📝?
📣系列專欄 - 機器學習【ML】?自然語言處理【NLP】? 深度學習【DL】
?
?🖍foreword
?說明?本人講解主要包括Python、機器學習(ML)、深度學習(DL)、自然語言處理(NLP)等內容。
如果你對這個系列感興趣的話,可以關注訂閱喲👋
文章目錄
Java 和 Scala 的單一 API
用于數據集的 Scala 案例類和 JavaBean
使用數據集
創建示例數據
轉換樣本數據
高階函數和函數式編程
將 DataFrame 轉換為數據集
數據集和數據幀的內存管理
數據集編碼器
Spark 的內部格式與 Java 對象格式
序列化和反序列化 (SerDe)
使用數據集的成本
降低成本的策略
概括
在第4章和第5章中,我們介紹了 Spark SQL 和 DataFrame API。我們研究了如何連接到內置和外部數據源,了解了 Spark SQL 引擎,并探討了諸如 SQL 和 DataFrame 之間的互操作性、創建和管理視圖和表以及高級 DataFrame 和 SQL 轉換等主題。
盡管我們在第 3 章中簡要介紹了 Dataset API?,但我們略讀了 Datasets(強類型分布式集合)在 Spark 中的創建、存儲、序列化和反序列化的主要方面。
在本章中,我們將深入了解數據集:我們將探索在 Java 和 Scala 中使用數據集,Spark 如何管理內存以適應作為高級 API 的一部分的數據集結構,以及與使用數據集相關的成本。
Java 和 Scala 的單一 API
您可能還記得第 3 章(圖 3-1和表 3-6),數據集為強類型對象提供了統一且單一的 API。在 Spark 支持的語言中,只有 Scala 和 Java 是強類型的;因此,Python 和 R 僅支持無類型的 DataFrame API。
數據集是特定領域的類型化對象,可以使用函數式編程或您熟悉的 DataFrame API 中的 DSL 運算符并行操作。
由于這個單一的 API,Java 開發人員不再冒險落后。例如,將來對 Scala 的groupBy()、flatMap()、map()或filter()API 的任何接口或行為更改對于 Java 也將是相同的,因為它是兩個實現通用的單一接口。
用于數據集的 Scala 案例類和 JavaBean
如果您還記得第 3 章(表 3-2),Spark 具有內部數據類型,例如StringType、BinaryType、IntegerType、BooleanType和MapType,它用于在 Spark 操作期間無縫映射到 Scala 和 Java 中的語言特定數據類型。這種映射是通過編碼器完成的,我們將在本章后面討論。
為了在 Scala 中創建類型化對象Dataset[T],T您需要一個定義該對象的案例類。使用第 3 章(表 3-1)中的示例數據,假設我們有一個 JSON 文件,其中包含數百萬關于博客作者以以下格式撰寫的關于 Apache Spark 的條目:
{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date: "1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}}, ... {id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date: "5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}要創建分布式Dataset[Bloggers],我們必須首先定義一個 Scala 案例類,該類定義包含 Scala 對象的每個單獨字段。此案例類用作類型化對象的藍圖或模式Bloggers:
// In Scala case class Bloggers(id:Int, first:String, last:String, url:String, date:String, hits: Int, campaigns:Array[String])我們現在可以從數據源中讀取文件:
val bloggers = "../data/bloggers.json" val bloggersDS = spark.read.format("json").option("path", bloggers).load().as[Bloggers]生成的分布式數據集合中的每一行都是類型Bloggers。
同樣,您可以在 Java 中創建一個 JavaBean 類型的類Bloggers,然后使用編碼器創建一個Dataset<Bloggers>:
// In Java import org.apache.spark.sql.Encoders; import java.io.Serializable;public class Bloggers implements Serializable {private int id;private String first;private String last;private String url;private String date;private int hits;private Array[String] campaigns;// JavaBean getters and setters int getID() { return id; } void setID(int i) { id = i; } String getFirst() { return first; } void setFirst(String f) { first = f; } String getLast() { return last; } void setLast(String l) { last = l; } String getURL() { return url; } void setURL (String u) { url = u; } String getDate() { return date; } Void setDate(String d) { date = d; } int getHits() { return hits; } void setHits(int h) { hits = h; }Array[String] getCampaigns() { return campaigns; } void setCampaigns(Array[String] c) { campaigns = c; } }// Create Encoder Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class); String bloggers = "../bloggers.json" Dataset<Bloggers>bloggersDS = spark.read.format("json").option("path", bloggers).load().as(BloggerEncoder);如您所見,在 Scala 和 Java 中創建數據集需要一些先見之明,因為您必須知道正在讀取的行的所有單個列名稱和類型。與 DataFrame 不同,您可以選擇讓 Spark 推斷架構,Dataset API 要求您提前定義數據類型,并且您的案例類或 JavaBean 類與您的架構匹配。
筆記
Scala 案例類或 Java 類定義中的字段名稱必須與數據源中的順序匹配。數據中每一行的列名會自動映射到類中對應的名稱,并自動保留類型。
如果字段名稱與您的輸入數據匹配,您可以使用現有的 Scala 案例類或 JavaBean 類。使用 Dataset API與使用 DataFrame一樣簡單、簡潔和聲明性。對于大多數數據集的轉換,您可以使用在前幾章中學習過的相同的關系運算符。
讓我們檢查使用示例數據集的一些方面.
使用數據集
創建示例數據集的一種簡單而動態的方法是使用實SparkSession??例。在這個場景中,為了說明的目的,我們動態地創建一個包含三個字段的 Scala 對象:(uid用戶的唯一 ID)、uname(隨機生成的用戶名字符串)和usage(服務器或服務使用的分鐘數)。
創建示例數據
首先,讓我們生成一些示例數據:
// In Scala import scala.util.Random._ // Our case class for the Dataset case class Usage(uid:Int, uname:String, usage: Int) val r = new scala.util.Random(42) // Create 1000 instances of scala Usage class // This generates data on the fly val data = for (i <- 0 to 1000) yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString(""),r.nextInt(1000))) // Create a Dataset of Usage typed data val dsUsage = spark.createDataset(data) dsUsage.show(10)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ | 0|user-Gpi2C| 525| | 1|user-DgXDi| 502| | 2|user-M66yO| 170| | 3|user-xTOn6| 913| | 4|user-3xGSz| 246| | 5|user-2aWRN| 727| | 6|user-EzZY1| 65| | 7|user-ZlZMZ| 935| | 8|user-VjxeG| 756| | 9|user-iqf1P| 3| +---+----------+-----+ only showing top 10 rows在 Java 中這個想法是相似的,但我們必須使用顯式Encoders(在 Scala 中,Spark 隱式處理):
// In Java import org.apache.spark.sql.Encoders; import org.apache.commons.lang3.RandomStringUtils; import java.io.Serializable; import java.util.Random; import java.util.ArrayList; import java.util.List;// Create a Java class as a Bean public class Usage implements Serializable {int uid; // user idString uname; // usernameint usage; // usagepublic Usage(int uid, String uname, int usage) {this.uid = uid;this.uname = uname;this.usage = usage;}// JavaBean getters and setters public int getUid() { return this.uid; }public void setUid(int uid) { this.uid = uid; }public String getUname() { return this.uname; }public void setUname(String uname) { this.uname = uname; }public int getUsage() { return this.usage; }public void setUsage(int usage) { this.usage = usage; }public Usage() {}public String toString() {return "uid: '" + this.uid + "', uame: '" + this.uname + "', usage: '" + this.usage + "'";} }// Create an explicit Encoder Encoder<Usage> usageEncoder = Encoders.bean(Usage.class); Random rand = new Random(); rand.setSeed(42); List<Usage> data = new ArrayList<Usage>()// Create 1000 instances of Java Usage class for (int i = 0; i < 1000; i++) {data.add(new Usage(i, "user" + RandomStringUtils.randomAlphanumeric(5),rand.nextInt(1000));// Create a Dataset of Usage typed data Dataset<Usage> dsUsage = spark.createDataset(data, usageEncoder);筆記
Scala 和 Java 生成的 Dataset 會有所不同,因為隨機種子算法可能不同。因此,您的 Scala 和 Java 的查詢結果會有所不同。
現在我們已經生成了數據集,dsUsage讓我們執行一些我們在前幾章中完成的常見轉換。
轉換樣本數據
回想一下,數據集是特定領域對象的強類型集合。這些對象可以使用函數或關系操作并行轉換。這些轉換的示例包括map()、reduce()、filter()、select()和aggregate()。作為高階函數的示例,這些方法可以將 lambda、閉包或函數作為參數并返回結果。因此,它們非常適合函數式編程。
Scala 是一種函數式編程語言,最近 lambda、函數式參數和閉包也被添加到 Java 中。讓我們在 Spark 中嘗試幾個高階函數,并將函數式編程結構與我們之前創建的示例數據一起使用。
高階函數和函數式編程
舉個簡單的例子,讓我們使用filter()返回我們dsUsage數據集中所有使用時間超過 900 分鐘的用戶。一種方法是使用函數表達式作為filter()方法的參數:
// In Scala import org.apache.spark.sql.functions._ dsUsage.filter(d => d.usage > 900).orderBy(desc("usage")).show(5, false)另一種方法是定義一個函數并將該函數作為參數提供給filter():
def filterWithUsage(u: Usage) = u.usage > 900 dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ |561|user-5n2xY| 999| |113|user-nnAXr| 999| |605|user-NL6c4| 999| |634|user-L0wci| 999| |805|user-LX27o| 996| +---+----------+-----+ only showing top 5 rows在第一種情況下,我們使用 lambda 表達式,{d.usage > 900}作為filter()方法的參數,而在第二種情況下,我們定義了一個 Scala 函數,def filterWithUsage(u: Usage) = u.usage > 900。在這兩種情況下,該filter()方法都會遍歷Usage分布式數據集中對象的每一行,并應用表達式或執行函數,Usage為表達式或函數的值為 的行返回類型為的新數據集true。(有關方法簽名的詳細信息,請參閱Scala 文檔。)
在 Java 中, to 的參數filter()類型為FilterFunction<T>。這可以匿名內聯或使用命名函數定義。在本例中,我們將按名稱定義函數并將其分配給變量f。應用此函數filter()將返回一個新數據集,其中包含我們過濾條件為的所有行true:
// In Java // Define a Java filter function FilterFunction<Usage> f = new FilterFunction<Usage>() {public boolean call(Usage u) {return (u.usage > 900);} };// Use filter with our function and order the results in descending order dsUsage.filter(f).orderBy(col("usage").desc()).show(5);+---+----------+-----+ |uid|uname |usage| +---+----------+-----+ |67 |user-qCGvZ|997 | |878|user-J2HUU|994 | |668|user-pz2Lk|992 | |750|user-0zWqR|991 | |242|user-g0kF6|989 | +---+----------+-----+ only showing top 5 rows并非所有 lambda 或函數參數都必須計算為Boolean值;他們也可以返回計算值。考慮這個使用高階函數的例子map(),我們的目標是找出每個用戶的使用成本,其usage價值超過某個閾值,這樣我們就可以為這些用戶提供每分鐘的特價。
// In Scala // Use an if-then-else lambda expression and compute a value dsUsage.map(u => {if (u.usage > 750) u.usage * .15 else u.usage * .50 }).show(5, false) // Define a function to compute the usage def computeCostUsage(usage: Int): Double = {if (usage > 750) usage * 0.15 else usage * 0.50 } // Use the function as an argument to map() dsUsage.map(u => {computeCostUsage(u.usage)}).show(5, false) +------+ |value | +------+ |262.5 | |251.0 | |85.0 | |136.95| |123.0 | +------+ only showing top 5 rows要map()在 Java 中使用,您必須定義一個MapFunction<T>.?這可以是匿名類或擴展的已定義類MapFunction<T>。對于這個例子,我們內聯使用它——也就是說,在方法調用本身中:
// In Java // Define an inline MapFunction dsUsage.map((MapFunction<Usage, Double>) u -> {if (u.usage > 750)return u.usage * 0.15;elsereturn u.usage * 0.50; }, Encoders.DOUBLE()).show(5); // We need to explicitly specify the Encoder +------+ |value | +------+ |65.0 | |114.45| |124.0 | |132.6 | |145.5 | +------+ only showing top 5 rows盡管我們已經計算了使用成本的值,但我們不知道計算值與哪些用戶相關聯。我們如何獲得這些信息?
步驟很簡單:
創建一個 Scala 案例類或 JavaBean 類UsageCost,帶有一個名為 的附加字段或列cost。
定義一個函數來計算并在方法cost中使用它。map()
這是 Scala 中的樣子:
// In Scala // Create a new case class with an additional field, cost case class UsageCost(uid: Int, uname:String, usage: Int, cost: Double)// Compute the usage cost with Usage as a parameter // Return a new object, UsageCost def computeUserCostUsage(u: Usage): UsageCost = {val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50UsageCost(u.uid, u.uname, u.usage, v) }// Use map() on our original Dataset dsUsage.map(u => {computeUserCostUsage(u)}).show(5)+---+----------+-----+------+ |uid| uname|usage| cost| +---+----------+-----+------+ | 0|user-Gpi2C| 525| 262.5| | 1|user-DgXDi| 502| 251.0| | 2|user-M66yO| 170| 85.0| | 3|user-xTOn6| 913|136.95| | 4|user-3xGSz| 246| 123.0| +---+----------+-----+------+ only showing top 5 rows現在我們有了一個轉換后的數據集,其中包含一個由轉換cost中的函數計算的新列,map()以及所有其他列。
同樣,在 Java 中,如果我們想要與每個用戶關聯的成本,我們需要定義一個 JavaBean 類UsageCost和MapFunction<T>.?有關完整的 JavaBean 示例,請參閱本書的GitHub 存儲庫;為簡潔起見,我們將僅在MapFunction<T>此處顯示內聯:
// In Java // Get the Encoder for the JavaBean class Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);// Apply map() function to our data dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {double v = 0.0;if (u.usage > 750) v = u.usage * 0.15; else v = u.usage * 0.50;return new UsageCost(u.uid, u.uname,u.usage, v); },usageCostEncoder).show(5);+------+---+----------+-----+ | cost|uid| uname|usage| +------+---+----------+-----+ | 65.0| 0|user-xSyzf| 130| |114.45| 1|user-iOI72| 763| | 124.0| 2|user-QHRUk| 248| | 132.6| 3|user-8GTjo| 884| | 145.5| 4|user-U4cU1| 970| +------+---+----------+-----+ only showing top 5 rows關于使用高階函數和數據集,有幾點需要注意:
-
我們使用類型化的 JVM 對象作為函數的參數。
-
我們使用點表示法(來自面向對象的編程)來訪問類型化 JVM 對象中的各個字段,使其更易于閱讀。
-
我們的一些函數和 lambda 簽名可以是類型安全的,確保編譯時錯誤檢測并指示 Spark 處理哪些數據類型、執行哪些操作等。
-
我們的代碼具有可讀性、表達性和簡潔性,在 lambda 表達式中使用 Java 或 Scala 語言特性。
-
Spark 在 Java 和 Scala 中都提供了與高階函數構造等效的map()和filter()沒有的高階函數構造,因此您不必將函數式編程與 Datasets 或 DataFrames 一起使用。相反,您可以簡單地使用條件 DSL 運算符或 SQL 表達式:例如,dsUsage.filter("usage > 900")或dsUsage($"usage" > 900).?(有關這方面的更多信息,請參閱“使用數據集的成本”。)
-
對于數據集,我們使用編碼器,這是一種在 JVM 和 Spark 的數據類型內部二進制格式之間有效轉換數據的機制(更多信息請參見“數據集編碼器”)。
筆記
高階函數和函數式編程并不是 Spark 數據集獨有的;您也可以將它們與 DataFrame 一起使用。回想一下,DataFrame 是一個Dataset[Row],其中Row是一個通用的無類型 JVM 對象,可以保存不同類型的字段。方法簽名采用對 進行操作的表達式或函數Row,這意味著每個Row的數據類型都可以作為表達式或函數的輸入值。
將 DataFrame 轉換為數據集
對于查詢和構造的強類型檢查,您可以將 DataFrames 轉換為 Datasets。要將現有 DataFrame 轉換df為 Dataset 類型SomeCaseClass,只需使用df.as[SomeCaseClass]符號。我們之前看到了一個這樣的例子:
// In Scala val bloggersDS = spark.read.format("json").option("path", "/data/bloggers/bloggers.json").load().as[Bloggers]spark.read.format("json")返回 a?DataFrame<Row>,它在 Scala 中是Dataset[Row].?Using.as[Bloggers]指示 Spark 使用本章后面討論的編碼器,將對象從 Spark 的內部內存表示序列化/反序列化為 JVMBloggers對象.
數據集和數據幀的內存管理
Spark 是一種密集型內存分布式大數據引擎,因此其內存的有效利用對其執行速度至關重要。1縱觀其發布歷史,Spark 對內存的使用發生了顯著變化:
-
Spark 1.0 使用基于 RDD 的 Java 對象進行內存存儲、序列化和反序列化,這在資源方面很昂貴且速度很慢。此外,存儲是在 Java 堆上分配的,因此對于大型數據集,您只能受 JVM 垃圾收集 (GC) 的支配。
-
Spark 1.x 引入了Project Tungsten。它的一個突出特點是一種新的內部基于行的格式,使用偏移量和指針在堆外內存中布局數據集和數據幀。Spark 使用一種稱為編碼器的高效機制在 JVM 與其內部 Tungsten 格式之間進行序列化和反序列化。在堆外分配內存意味著 Spark 較少受到 GC 的阻礙。
-
Spark 2.x 引入了第二代 Tungsten 引擎,具有全階段代碼生成和向量化的基于列的內存布局。基于現代編譯器的思想和技術,這個新版本還利用現代 CPU 和緩存架構,通過“單指令多數據”(SIMD) 方法實現快速并行數據訪問。
數據集編碼器
編碼器將堆外內存中的數據從 Spark 的內部 Tungsten 格式轉換為 JVM Java 對象。換句話說,它們將數據集對象從 Spark 的內部格式序列化和反序列化為 JVM 對象,包括原始數據類型。例如,anEncoder[T]將從 Spark 的內部 Tungsten 格式轉換為Dataset[T].
Spark 內置支持為基本類型(例如,字符串、整數、長整數)、Scala 案例類和 JavaBeans 自動生成編碼器。與 Java 和 Kryo 的序列化和反序列化相比,Spark 編碼器的速度要快得多。
在我們之前的 Java 示例中,我們顯式地創建了一個編碼器:
Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);然而,對于 Scala,Spark 會自動為這些高效的轉換器生成字節碼。讓我們來看看 Spark 內部基于 Tungsten 行的格式。
Spark 的內部格式與 Java 對象格式
Java 對象有很大的開銷——標頭信息、哈希碼、Unicode 信息等。即使是簡單的 Java 字符串(例如“abcd”)也需要 48 個字節的存儲空間,而不是您可能期望的 4 個字節。例如,想象一下創建MyClass(Int, String, String)對象的開銷。
Spark 不是為 Datasets 或 DataFrames 創建基于 JVM 的對象,而是分配堆外 Java 內存來布置它們的數據,并使用編碼器將數據從內存表示轉換為 JVM 對象。例如,圖 6-1顯示了 JVM 對象如何在MyClass(Int, String, String)內部存儲。
圖 6-1。JVM 對象存儲在由 Spark 管理的連續堆外 Java 內存中
當數據以這種連續方式存儲并通過指針算法和偏移量訪問時,編碼器可以快速序列化或反序列化該數據。這意味著什么?
序列化和反序列化 (SerDe)
分布式計算中的一個并不新鮮的概念,其中數據經常通過網絡在集群中的計算機節點之間傳輸,序列化和反序列化是發送方將類型化對象編碼(序列化)為二進制表示或格式并解碼的過程(反序列化)從二進制格式到接收器各自的數據類型對象。
例如,如果圖 6-1MyClass中的 JVM 對象必須在Spark 集群中的節點之間共享,則發送方會將其序列化為字節數組,而接收方會將其反序列化回類型為 的 JVM 對象。MyClass
JVM 有自己的內置 Java 序列化器和反序列化器,但效率低下,因為(正如我們在上一節中看到的)JVM 在堆內存中創建的 Java 對象是臃腫的。因此,該過程是緩慢的。
這就是數據集編碼器來救援的地方,原因如下:
-
Spark 的內部 Tungsten 二進制格式(參見圖6-1和6-2)將對象存儲在 Java 堆內存之外,而且它很緊湊,因此這些對象占用的空間更少。
-
編碼器可以通過使用帶有內存地址和偏移量的簡單指針算法遍歷內存來快速序列化(圖 6-2)。
-
在接收端,編碼器可以快速將二進制表示反序列化為 Spark 的內部表示。編碼器不受 JVM 垃圾收集暫停的阻礙。
圖 6-2。Spark 內部基于 Tungsten 行的格式
然而,正如我們接下來要討論的那樣,生活中大多數美好的事物都是有代價的。
使用數據集的成本
在第 3 章的“DataFrames 與 Datasets”中,我們概述了使用 Datasets 的一些好處——但這些好處是有代價的。如上一節所述,當數據集被傳遞給高階函數時,例如,或接受 lambdas 和函數參數的函數,從 Spark 的內部 Tungsten 格式反序列化到 JVM 對象會產生相關成本。filter()map()flatMap()
與在 Spark 中引入編碼器之前使用的其他序列化器相比,此成本很小且可以忍受。但是,在更大的數據集和許多查詢中,此成本會累積并可能影響性能。
降低成本的策略
減輕過度序列化和反序列化的一種策略是在查詢中使用DSL 表達式,并避免過度使用 lambda 作為匿名函數作為高階函數的參數。因為 lambda 在運行時之前對 Catalyst 優化器是匿名且不透明的,所以當您使用它們時,它無法有效地識別您在做什么(您沒有告訴 Spark要做什么),因此無法優化您的查詢(請參閱“Catalyst Optimizer”在第 3 章中)。
第二種策略是以最小化序列化和反序列化的方式將查詢鏈接在一起。將查詢鏈接在一起是 Spark 中的常見做法。
讓我們用一個簡單的例子來說明。假設我們有一個類型為 的數據集Person,其中Person定義為 Scala 案例類:
// In Scala Person(id: Integer, firstName: String, middleName: String, lastName: String, gender: String, birthDate: String, ssn: String, salary: String)我們想使用函數式編程向這個數據集發出一組查詢。
讓我們來看看我們編寫查詢效率低下的情況,以這種方式我們在不知不覺中產生了重復序列化和反序列化的成本:
import java.util.Calendar val earliestYear = Calendar.getInstance.get(Calendar.YEAR) - 40personDS// Everyone above 40: lambda-1.filter(x => x.birthDate.split("-")(0).toInt > earliestYear)// Everyone earning more than 80K.filter($"salary" > 80000)// Last name starts with J: lambda-2.filter(x => x.lastName.startsWith("J"))// First name starts with D.filter($"firstName".startsWith("D")).count()正如您在圖 6-3中所看到的,每次我們從 lambda 移動到 DSL( ) 時,都會產生序列化和反序列化JVM 對象filter($"salary" > 8000)的成本。Person
圖 6-3。使用 lambdas 和 DSL 鏈接查詢的低效方式
相比之下,以下查詢僅使用 DSL,不使用 lambda。因此,它的效率要高得多——整個組合和鏈式查詢不需要序列化/反序列化:
personDS.filter(year($"birthDate") > earliestYear) // Everyone above 40.filter($"salary" > 80000) // Everyone earning more than 80K.filter($"lastName".startsWith("J")) // Last name starts with J.filter($"firstName".startsWith("D")) // First name starts with D.count()概括
在本章中,我們詳細介紹了如何在 Java 和 Scala 中使用數據集。我們探索了 Spark 如何管理內存以將 Dataset 構造作為其統一和高級 API 的一部分,并且我們考慮了與使用 Datasets 相關的一些成本以及如何降低這些成本。我們還向您展示了如何在 Spark 中使用 Java 和 Scala 的函數式編程結構。
最后,我們深入了解了編碼器如何從 Spark 的內部 Tungsten 二進制格式序列化和反序列化為 JVM 對象。
總結
以上是生活随笔為你收集整理的【Apache Spark 】第 6 章Spark SQL 和数据集的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 0x80070057复制从服务器复制文件
- 下一篇: PostgreSQL 之 学籍管理示例