数据湖之iceberg系列(四)iceberg-spark编程
1 創(chuàng)建maven項目 添加依賴
<properties>
? ? ? ? <maven.compiler.source>1.8</maven.compiler.source>
? ? ? ? <maven.compiler.target>1.8</maven.compiler.target>
? ? ? ? <scala.version>2.12.12</scala.version>
? ? ? ? <spark.version>3.0.0</spark.version>
? ? ? ? <hadoop.version>3.1.1</hadoop.version>
? ? ? ? <encoding>UTF-8</encoding>
? ? </properties>
?
? ? <dependencies>
? ? ? ? <!-- 導(dǎo)入scala的依賴 -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.scala-lang</groupId>
? ? ? ? ? ? <artifactId>scala-library</artifactId>
? ? ? ? ? ? <version>${scala.version}</version>
? ? ? ? </dependency>
?
? ? ? ? <!-- 導(dǎo)入spark的依賴 -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.spark</groupId>
? ? ? ? ? ? <artifactId>spark-core_2.12</artifactId>
? ? ? ? ? ? <version>${spark.version}</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.spark</groupId>
? ? ? ? ? ? <artifactId>spark-sql_2.12</artifactId>
? ? ? ? ? ? <version>${spark.version}</version>
? ? ? ? </dependency>
?
? ? ? ? <!--JDBC驅(qū)動包-->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>mysql</groupId>
? ? ? ? ? ? <artifactId>mysql-connector-java</artifactId>
? ? ? ? ? ? <version>5.1.48</version>
? ? ? ? </dependency>
?
? ? ? ? <!--hive-->
? ? <!-- ? ?<dependency>
? ? ? ? ? ? <groupId>org.apache.spark</groupId>
? ? ? ? ? ? <artifactId>spark-hive_2.12</artifactId>
? ? ? ? ? ? <version>${spark.version}</version>
? ? ? ? </dependency>-->
?
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>com.alibaba</groupId>
? ? ? ? ? ? <artifactId>fastjson</artifactId>
? ? ? ? ? ? <version>1.2.62</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>c3p0</groupId>
? ? ? ? ? ? <artifactId>c3p0</artifactId>
? ? ? ? ? ? <version>0.9.1.2</version>
? ? ? ? </dependency>
?
?
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.iceberg</groupId>
? ? ? ? ? ? <artifactId>iceberg-core</artifactId>
? ? ? ? ? ? <version>0.10.0</version>
? ? ? ? </dependency>
?
? ? ? ? <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark3-runtime -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.iceberg</groupId>
? ? ? ? ? ? <artifactId>iceberg-spark3-runtime</artifactId>
? ? ? ? ? ? <version>0.10.0</version>
? ? ? ? </dependency>
?
? ? ? ? <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.avro</groupId>
? ? ? ? ? ? <artifactId>avro</artifactId>
? ? ? ? ? ? <version>1.9.0</version>
? ? ? ? </dependency>
? ? </dependencies>
?
? ? <build>
? ? ? ? <plugins>
? ? ? ? ? ? <!-- 指定編譯java的插件 -->
? ? ? ? ? ? <plugin>
? ? ? ? ? ? ? ? <groupId>org.apache.maven.plugins</groupId>
? ? ? ? ? ? ? ? <artifactId>maven-compiler-plugin</artifactId>
? ? ? ? ? ? ? ? <version>3.5.1</version>
? ? ? ? ? ? </plugin>
? ? ? ? ? ? <!-- 指定編譯scala的插件 -->
? ? ? ? ? ? <plugin>
? ? ? ? ? ? ? ? <groupId>net.alchim31.maven</groupId>
? ? ? ? ? ? ? ? <artifactId>scala-maven-plugin</artifactId>
? ? ? ? ? ? ? ? <version>3.2.2</version>
? ? ? ? ? ? ? ? <executions>
? ? ? ? ? ? ? ? ? ? <execution>
? ? ? ? ? ? ? ? ? ? ? ? <goals>
? ? ? ? ? ? ? ? ? ? ? ? ? ? <goal>compile</goal>
? ? ? ? ? ? ? ? ? ? ? ? ? ? <goal>testCompile</goal>
? ? ? ? ? ? ? ? ? ? ? ? </goals>
? ? ? ? ? ? ? ? ? ? ? ? <configuration>
? ? ? ? ? ? ? ? ? ? ? ? ? ? <args>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <arg>-dependencyfile</arg>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <arg>${project.build.directory}/.scala_dependencies</arg>
? ? ? ? ? ? ? ? ? ? ? ? ? ? </args>
? ? ? ? ? ? ? ? ? ? ? ? </configuration>
? ? ? ? ? ? ? ? ? ? </execution>
? ? ? ? ? ? ? ? </executions>
? ? ? ? ? ? </plugin>
?
? ? ? ? </plugins>
? ? </build>
2 加載已經(jīng)存在的表數(shù)據(jù)
加載Hadoop表中的數(shù)據(jù)?
?/**
? ? ?* 1 添加個iceberg操作有關(guān)的依賴
? ? ?* 2 ?獲取spark對象
? ? ?* 3 ?定義對應(yīng)的(catalog)數(shù)據(jù)源
? ? ?* type: 支持 hadoop ?會將目錄和數(shù)據(jù)源信息存儲在HDFS上
? ? ?*/
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設(shè)置數(shù)據(jù)源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設(shè)置數(shù)據(jù)源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? ? ?// 加載指定的表 , 并展示所有的數(shù)據(jù) , 這個表是已經(jīng)存在的
? ? ? ?spark.read.format("iceberg").table("hadoop_prod.default.tb_test1").show()
? ? spark.close()
3 ?創(chuàng)建一張表
private def createTable = {
? ? val spark = SparkSession.builder()
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? val conf = new Configuration
? ? val warehousePath = "hdfs://linux01:8020//doit/iceberg/warehouse/"
? ? // 使用hadoop的catalog
? ? val catalog = new HadoopCatalog(conf, warehousePath)
? ? import org.apache.iceberg.catalog.TableIdentifier
? ? // 參數(shù)一 ?數(shù)據(jù)庫 ? 參數(shù)二 表
? ? val name: TableIdentifier = TableIdentifier.of("logging", "tb_user")
? ? val schema = new Schema(
? ? ? Types.NestedField.required(1, "id", Types.StringType.get()),
? ? ? Types.NestedField.required(2, "name", Types.StringType.get()),
? ? ? Types.NestedField.required(3, "age", Types.IntegerType.get()),
? ? );
? ? // 創(chuàng)建表
? ? catalog.createTable(name, schema, null)
?
? ? spark.close()
? }
?根據(jù)加載的數(shù)據(jù)創(chuàng)建表
? ?val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設(shè)置數(shù)據(jù)源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設(shè)置數(shù)據(jù)源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? // 讀取數(shù)據(jù) 生成DF
? ? val schema = new StructType()
? ? ? .add("id", DataTypes.IntegerType, false)
? ? ? .add("name", DataTypes.StringType, false)
? ? ? .add("age", DataTypes.IntegerType, false)
? ? val df: DataFrame = spark.read.schema(schema).csv("hdfs://linux01:8020/log/")
? ? // 第一次插入數(shù)據(jù)使用 replace() ?以后再插入數(shù)據(jù)使用 append
? ? df.writeTo("hadoop_prod.logging.tb_user3").create()
? ? spark.close()
創(chuàng)建分區(qū)表和指定數(shù)據(jù)存儲格式?
val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設(shè)置數(shù)據(jù)源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設(shè)置數(shù)據(jù)源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? // 讀取數(shù)據(jù) 生成DF
? ? val schema = new StructType()
? ? ? .add("id", DataTypes.IntegerType, false)
? ? ? .add("name", DataTypes.StringType, false)
? ? ? .add("age", DataTypes.IntegerType, false)
? ? val df: DataFrame = spark.read.schema(schema).csv("hdfs://linux01:8020/log/")
? ? // 第一次插入數(shù)據(jù)使用 replace() ?以后再插入數(shù)據(jù)使用 append
? ? ?import spark.implicits._
? ? // 創(chuàng)建一張表 ,指定數(shù)據(jù)的存儲格式和分區(qū)
? ? df.writeTo("hadoop_prod.logging.tb_user5")
? ? ? .tableProperty("write.format.default", "orc")
? ? ? .partitionedBy('name)
? ? ? .create
? ? spark.close()
4 列舉出所有名稱空間下的表
? private def listTables = {
? ? val conf = new Configuration
? ? val warehousePath = "hdfs://linux01:8020//doit/iceberg/warehouse/"
? ? // 使用hadoop的catalog
? ? val catalog = new HadoopCatalog(conf, warehousePath)
? ? val namespace: Namespace = Namespace.of("logging")
? ? val identifiers: util.List[TableIdentifier] = catalog.listTables(namespace)
? ? val value: util.Iterator[TableIdentifier] = identifiers.iterator()
? ? // 遍歷所有的表
? ? while (value.hasNext) {
? ? ? println(value.next().name())
? ? }
? }
5 創(chuàng)建名稱空間和列出系統(tǒng)中所有的數(shù)據(jù)庫?
private def createNameSpaceAndList = {
? ? val conf = new Configuration
? ? val warehousePath = "hdfs://linux01:8020//doit/iceberg/warehouse/"
? ? // 創(chuàng)建一個catalog
? ? val catalog = new HadoopCatalog(conf, warehousePath)
? ? // 創(chuàng)建一個名稱空間 ?也就是數(shù)據(jù)庫
? ? catalog.createNamespace(Namespace.of("db1"))
? ? //列舉出當(dāng)前所有的數(shù)據(jù)庫
? ? val namespaces: util.List[Namespace] = catalog.listNamespaces()
? ? // 遍歷所有的數(shù)據(jù)庫打印結(jié)果
? ? val iters: util.Iterator[Namespace] = namespaces.iterator()
? ? while (iters.hasNext) {
? ? ? val namespace: Namespace = iters.next()
? ? ? println(namespace.toString)
? ? }
? }
6 讀取靜態(tài)數(shù)據(jù)向iceberg表中插入數(shù)據(jù)
?private def insertIntoDataToHadoopCatalogTable = {
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設(shè)置數(shù)據(jù)源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設(shè)置數(shù)據(jù)源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? // 讀取數(shù)據(jù) 生成DF
? ? val schema = new StructType()
? ? ? .add("id", DataTypes.IntegerType, false)
? ? ? .add("name", DataTypes.StringType, false)
? ? ? .add("age", DataTypes.IntegerType, false)
? ? val df: DataFrame = spark.read.schema(schema).csv("hdfs://linux01:8020/log/")
? ? // 第一次插入數(shù)據(jù)使用 replace() ?以后再插入數(shù)據(jù)使用 append
? ? df.writeTo("hadoop_prod.logging.tb_user2").append()
? ? spark.close()
? }
7 讀取hadoop類型的iceberg表數(shù)據(jù)
?private def readHadoopCatalogData = {
? ? val spark = SparkSession.builder()
? ? ? .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 設(shè)置數(shù)據(jù)源類別為hadoop
? ? ? .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
? ? ? // 指定Hadoop數(shù)據(jù)源的根目錄
? ? ? .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 設(shè)置數(shù)據(jù)源位置
? ? ? .appName(this.getClass.getSimpleName)
? ? ? .master("local[*]")
? ? ? .getOrCreate()
? ? // 加載hadoop指定表目錄下的表數(shù)據(jù)
? ? val df: DataFrame = spark.read.format("iceberg").load("hdfs://linux01:8020//doit/iceberg/warehouse/logging/tb_user2")
? ? // 打印表結(jié)構(gòu)
? ? df.printSchema()
? ? // 展示數(shù)據(jù)
? ? df.show()
? ? // 使用SQL方式查詢數(shù)據(jù)?
? ? df.createTempView("tb_user2")
? ? spark.sql("select * from tb_user2").show()
? ? spark.close()
? }
?
————————————————
版權(quán)聲明:本文為CSDN博主「白眼黑刺猬」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_37933018/article/details/110483423
總結(jié)
以上是生活随笔為你收集整理的数据湖之iceberg系列(四)iceberg-spark编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据湖之iceberg系列(一)iceb
- 下一篇: 数据湖之iceberg系列(五)-Spa