日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何使用MaxCompute Spark读写阿里云Hbase

發布時間:2024/8/23 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何使用MaxCompute Spark读写阿里云Hbase 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景

Spark on MaxCompute可以訪問位于阿里云VPC內的實例(例如ECS、HBase、RDS),默認MaxCompute底層網絡和外網是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.vpc.domain.list來訪問阿里云的vpc網絡環境的Hbase。Hbase標準版和增強版的配置不同,本文通過訪問阿里云的標準版和增強版的Hbase簡單的描述需要加的配置。

Hbase標準版

環境準備
Hbase的網絡環境是存在vpc下的,所以我們首先要添加安全組開放端口2181、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
設置對應vpc的安全組

找到對應的vpc id然后添加安全組設置端口


添加Hbase的白名單


在hbase的白名單添加

100.104.0.0/16

創建Hbase表

create 'test','cf'

編寫Spark程序
需要的Hbase依賴

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-mapreduce</artifactId><version>2.0.2</version></dependency><dependency><groupId>com.aliyun.hbase</groupId><artifactId>alihbase-client</artifactId><version>2.0.5</version></dependency>

編寫代碼

object App {def main(args: Array[String]) {val spark = SparkSession.builder().appName("HbaseTest").config("spark.sql.catalogImplementation", "odps").config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api").config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api").getOrCreate()val sc = spark.sparkContextval config = HBaseConfiguration.create()val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);val jobConf = new JobConf(config)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")try{import spark._spark.sql("select '7', 88 ").rdd.map(row => {val name= row(0).asInstanceOf[String]val id = row(1).asInstanceOf[Integer]val put = new Put(Bytes.toBytes(id))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))(new ImmutableBytesWritable, put)}).saveAsHadoopDataset(jobConf)} finally {sc.stop()}} }

提交到DataWorks
由于大于50m通過odps客戶端提交

add jar SparkHbase-1.0-SNAPSHOT -f;

進入數據開發新建spark節點


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
這里的hbase域名需要hbase所有的機器,少一臺可能會造成網絡不通

{"regionId":"cn-beijing","vpcs":[{"vpcId":"vpc-2zeaeq21mb1dmkqh0exox","zones":[{"urls":[{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":2181},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":2181},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":2181},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020}]}]}] }

Hbase增強版

環境準備
Hbase增強版的端口是30020、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
設置對應vpc的安全組
找到對應的vpc id然后添加安全組設置端口

添加Hbase的白名單

100.104.0.0/16

創建Hbase表?

create 'test','cf'

編寫Spark程序
需要的Hbase依賴,引用的包必須是阿里云增強版的依賴

<dependency><groupId>com.aliyun.hbase</groupId><artifactId>alihbase-client</artifactId><version>2.0.8</version></dependency>

編寫代碼

object McToHbase {def main(args: Array[String]) {val spark = SparkSession.builder().appName("spark_sql_ddl").config("spark.sql.catalogImplementation", "odps").config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api").config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api").getOrCreate()val sc = spark.sparkContexttry{spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>val config = HBaseConfiguration.create()// 集群的連接地址(VPC內網地址)在控制臺頁面的數據庫連接界面獲得config.set("hbase.zookeeper.quorum", ":30020");import spark._// xml_template.comment.hbaseue.username_password.defaultconfig.set("hbase.client.username", "");config.set("hbase.client.password", "");val tableName = TableName.valueOf( "test")val conn = ConnectionFactory.createConnection(config)val table = conn.getTable(tableName);val puts = new util.ArrayList[Put]()iter.foreach(row => {val id = row(0).asInstanceOf[String]val name = row(1).asInstanceOf[String]val put = new Put(Bytes.toBytes(id))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))puts.add(put)table.put(puts)})}} finally {sc.stop()}} }

注意
hbase clinet會報org.apache.spark.SparkException: Task not serializable
原因是spark會把序列化對象以將其發送給其他的worker
解決方案

- 使類可序列化 - 僅在map中傳遞的lambda函數中聲明實例。 - 將NotSerializable對象設置為靜態對象,并在每臺計算機上創建一次。 - 調用rdd.forEachPartition并在其中創建Serializable對象,如下所示:rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...現在處理iter});


提交到DataWorks
由于大于50m通過odps客戶端提交

add jar SparkHbase-1.0-SNAPSHOT -f;

進入數據開發新建spark節點


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
注意
1.這個里需要添加增強版java api訪問地址,這里必須采用ip的形式。ip通過直接ping該地址獲取,這里的ip是172.16.0.10添加端口16000


2.這里的hbase域名需要hbase所有的機器,少一臺可能會造成網絡不通

{"regionId":"cn-beijing","vpcs":[{"vpcId":"vpc-2zeaeq21mb1dmkqh0exox","zones":[{"urls":[{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":30020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":30020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":30020},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16000},{"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com","port":16020},{"domain":"172.16.0.10","port":16000}]}]}] }

大家如果對MaxCompute有更多咨詢或者建議,歡迎掃碼加入 MaxCompute開發者社區釘釘群,或點擊鏈接?申請加入。

原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。

總結

以上是生活随笔為你收集整理的如何使用MaxCompute Spark读写阿里云Hbase的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。