日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Iceberg 快速入门

發布時間:2024/1/23 编程问答 66 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Iceberg 快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

導言

本文主要介紹如何快速的通過Spark訪問 Iceberg table。


如果想及時了解Spark、Hadoop或者HBase相關的文章,歡迎關注微信公眾號:iteblog_hadoop

Spark通過DataSource和DataFrame API訪問Iceberg table,或者進行Catalog相關的操作。由于Spark Data Source V2 API還在持續的演進和修改中,所以Iceberg在不同的Spark版本中的使用方式有所不同。

版本對比

功能Spark 2.4Spark 3.0
基于DataFrame
- 讀數據支持支持
- 讀元數據支持支持
- 追加(append)支持支持
- 覆蓋(Overwrite)支持支持
- V2 source專屬操作,如create, overwrite不支持支持
基于Spark SQL
- SELECT通過DataFrame的temporary view支持
- DDL不支持(僅能通過Iceberg API)支持(通過Catalog)
- DML不支持支持

Spark 2.4 環境的使用

配置

Hive MetaStore

Iceberg 內部支持 Hive 和 Hadoop 兩種 catalog:

Catalog類型Metadata JSON管理Namespace
Hive catalogHive MetaStore1級,即DB
Hadoop catalog文件系統上的某個文件多級,對應多級目錄

后文以Hive catalog為主做介紹。Hive catalog需要Hive MetaStore的支持。注意其有多種配置方式,其中內嵌的Derby數據庫僅僅用于實驗和學習,不能用于生產環境。

Spark

<SPARK_HOME>/conf/spark-defaults.conf需要加入如下配置,使Iceberg能夠訪問Hive MetaStore:

spark.hadoop.hive.metastore.uris?????????? thrift://<HiveMetaStore>:9083

spark.hadoop.hive.metastore.warehouse.dir? hdfs://<NameNode>:8020/path

部署

如何使用社區正式發布的版本:

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating

如何本地打包,并把Iceberg放入Spark的classpath:

git clone https://github.com/apache/incubator-iceberg.git

cd incubator-iceberg

# master branch supports Spark 2.4.4

./gradlew assemble

spark-shell --jars <iceberg-git-working-directory>/spark-runtime/build/libs/iceberg-spark-runtime-<version>.jar

讀Iceberg table

通過DataFrame

Spark 2.4只能讀寫已經存在的Iceberg table。在后續的操作前,需要先通過Iceberg API來創建table。具體如下:

import org.apache.iceberg.catalog.TableIdentifier;

val name = TableIdentifier.of("default", "person");

import org.apache.iceberg.Schema;

import org.apache.iceberg.types.Types;

val schema = new Schema(

??????Types.NestedField.required(1, "id", Types.IntegerType.get()),

??????Types.NestedField.required(2, "name", Types.StringType.get()),

??????Types.NestedField.required(3, "age", Types.IntegerType.get())

????);

import org.apache.iceberg.PartitionSpec;

val spec = PartitionSpec.unpartitioned

import org.apache.iceberg.hive.HiveCatalog;

val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);

val table = catalog.createTable(name, schema, spec);

讀取是通過DataFrameReader并指定iceberg作為format來訪問Iceberg table,隨后Iceberg內部的邏輯會根據path來判斷訪問的是Hive catalog下的table,還是用文件系統的路徑表示的Hadoop table。

// Table managed by Hive catalog

spark.read

?????.format("iceberg")

?????.load("db.table")

// Hadoop table, identified by a path

spark.read

?????.format("iceberg")

?????.load("hdfs://<NameNode>:8020/<path_to_table>")

Iceberg會判斷path中是否含有"/"。如果是,則認為是一個用路徑表示Hadoop table;否則,會去Hive catalog中尋找。

利用time travel回溯某一個snapshot的數據

在讀取時,通過option指定as-of-timestamp或者snapshot-id來訪問之前某一個snapshot中的數據:

// Time travel to October 26, 1986 at 01:21:00

spark.read

?????.format("iceberg")

?????.option("as-of-timestamp", "499162860000")

?????.load("db.table")

// Time travel to snapshot with ID 10963874102873L

spark.read

?????.format("iceberg")

?????.option("snapshot-id", 10963874102873L)

?????.load("db.table")

snapshot-id的獲取方法,可以參考后文中訪問元數據中snapshot的部分,或者直接查看元數據文件的內容。

在DataFrame基礎上使用SQL SELECT
在DataFrame的基礎上,創建local temporary view后,也可以通過SQL SELECT來讀取Iceberg table的內容:

val df = spark.read

??????????????.format("iceberg")

??????????????.load("db.table")

df.createOrReplaceTempView("view")

spark.sql("""SELECT * FROM view""")

?????.show()

寫Iceberg table

Spark 2.4可以通過DataFrameWriter并指定iceberg作為format來寫入Iceberg table,并支持append和overwrite兩種模式:

// Append

df.write

??.format("iceberg")

??.mode("append")

??.save("db.table")

// Overwrite

df.write

??.format("iceberg")

??.mode("overwrite")

??.save("db.table")

有如下幾點需要注意:

  • Overwrit的行為dynamic overwrite,即當某個partition中含有輸入DataFrame中的行的時候,該partition才會被新數據完全覆蓋;其他partition則保持不變。而Spark 2.4中原生數據源(如parquet)的默認行為是static overwrite;
  • 操作粒度是文件級別,并不是行級別;
  • mode必須顯式指定,沒有默認行為。
  • 訪問Iceberg table的元數據

    Iceberg支持通過DataFrameReader訪問table的元數據,如snapshot,manifest等。對于Hive table,可以在原table name后面加.history、.snapshots等表示要訪問元數據;對于用路徑來表示的Hadoop table,需要在原路徑后面加#history等。例如:

    // Read snapshot history of db.table

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.history")

    結果

    +-------------------------+---------------------+---------------------+---------------------+

    | made_current_at???????? | snapshot_id???????? | parent_id?????????? | is_current_ancestor |

    +-------------------------+---------------------+---------------------+---------------------+

    | 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL??????????????? | true??????????????? |

    | 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true??????????????? |

    | 2019-02-09 16:24:30.13? | 296410040247533544? | 5179299526185056830 | false?????????????? |

    | 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true??????????????? |

    | 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true??????????????? |

    | 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true??????????????? |

    +-------------------------+---------------------+---------------------+---------------------+

    又如:

    // Read snapshot list of db.table

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.snapshots")

    // Read manifest files of db.table

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.manifests")

    // Read data file list of db.tabe

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.files")

    可以進一步將history和snapshot按照snapshot id做join,來查找snapshot id對應的application id:

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.history")

    ?????.createOrReplaceTempView("history")

    spark.read

    ?????.format("iceberg")

    ?????.load("db.table.snapshots")

    ?????.createOrReplaceTempView("snapshots")

    SELECT

    ????h.made_current_at,

    ????s.operation,

    ????h.snapshot_id,

    ????h.is_current_ancestor,

    ????s.summary['spark.app.id']

    FROM history h

    JOIN snapshots s

    ??ON h.snapshot_id = s.snapshot_id

    ORDER BY made_current_at

    結果如下:

    -------------------------+-----------+----------------+---------------------+----------------------------------+

    | made_current_at???????? | operation | snapshot_id??? | is_current_ancestor | summary[spark.app.id]??????????? |

    +-------------------------+-----------+----------------+---------------------+----------------------------------+

    | 2019-02-08 03:29:51.215 | append??? | 57897183625154 | true??????????????? | application_1520379288616_155055 |

    | 2019-02-09 16:24:30.13? | delete??? | 29641004024753 | false?????????????? | application_1520379288616_151109 |

    | 2019-02-09 16:32:47.336 | append??? | 57897183625154 | true??????????????? | application_1520379288616_155055 |

    | 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true??????????????? | application_1520379288616_152431 |

    +-------------------------+-----------+----------------+---------------------+----------------------------------+

    Spark 3.0 環境的使用

    Iceberg在Spark 3.0中,作為V2 Data Source,除了上述Spark 2.4所有的訪問能力外,還可以通過V2 Data Source專屬的DataFrame API訪問;同時,受益于external catalog的支持,Spark SQL的DDL功能也可以操作Iceberg table,并且DML語句支持也更加豐富。

    配置external catalog

    在<SPARK_HOME>/conf/spark-defaults.conf加入如下配置:

    spark.sql.catalog.catalog-name=com.example.YourCatalogClass

    通過V2 Data Source專屬DataFrame API訪問

    df.writeTo("catalog-name.db.table")

    ??.overwritePartitions()

    通過Spark SQL訪問

    相較于Spark 2.4,Spark 3.0可以省去DataFrameReader和創建local temporary view的步驟,直接通過Spark SQL進行操作:

    -- Create table

    CREATE TABLE catalog-name.db.tabe

    ????(id INT, data STRING)

    ????USING iceberg

    ????PARTITIONED BY (id)

    -- Insert

    INSERT INTO catalog-name.db.table

    ????VALUES (1, 'a'), (2, 'b'), (3, 'c')

    -- Delete

    DELETE FROM catalog-name.db.table

    ????WHERE id <> 1

    -- Update

    UPDATE catalog-name.db.table

    ????SET data = 'C' WHERE id = 3

    -- Create table as select

    CREATE TABLE catalog-name.db.table

    ????USING iceberg

    ????AS SELECT id, data

    ???????FROM catalog-name.db.table1

    ???????WHERE id <= 2

    我們作為社區中spark-3分支的維護者,正在持續推進新功能的開發和合入,讓更多的人受益。

    總結

    本文作為Iceberg的快速入門,介紹了如何通過Spark訪問Iceberg table,以及不同Spark版本的支持情況:

    • Spark 2.4可以通過DataFrame讀取或修改已經存在的Iceberg table中的數據,但建表、刪表等DDL操作只能通過Iceberg API完成;
    • Spark 3.0訪問Iceberg table的能力是Spark 2.4的超集,可以通過Spark SQL配合catalog,進行SELECT、DDL和DML等更多的操作。

    隨著Iceberg自身功能的完善(如向量化讀取,merge on read等),以及上下游對接和生態的豐富,Iceberg作為優秀的表格式抽象,在大數據領域必然會有更好的發展。

    本文原文:https://mp.weixin.qq.com/s/vvsnHrbzxJ3Gno1XtzHO7g

    總結

    以上是生活随笔為你收集整理的Apache Iceberg 快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。