日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)

發布時間:2024/7/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用 saveAsHadoopDataset 寫入數據

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志顯示在終端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //設置zookeeper連接端口,默認2181

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的

val jobConf = new JobConf(hbaseConf)

jobConf.setOutputFormat(classOf[TableOutputFormat])

val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))

val rdd = indataRDD.map(_.split(',')).map{ arr=>

/*一個Put對象就是一行記錄,在構造方法中指定主鍵

* 所有插入的數據必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換

* Put.addColumn 方法接收三個參數:列族,列名,數據*/

val put = new Put(Bytes.toBytes(arr(0)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))

(new ImmutableBytesWritable, put)

}

rdd.saveAsHadoopDataset(jobConf)

spark.stop()

}

}

使用 newAPIHadoopRDD 讀取數據

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志顯示在終端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //設置zookeeper連接端口,默認2181

hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

// 如果表不存在則創建表

val admin = new HBaseAdmin(hbaseConf)

if (!admin.isTableAvailable(tablename)) {

val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))

admin.createTable(tableDesc)

}

//讀取數據并轉化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result])

hBaseRDD.foreach{ case (_ ,result) =>

//獲取行鍵

val key = Bytes.toString(result.getRow)

//通過列族和列名獲取列

val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))

val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))

println("Row key:"+key+" Name:"+name+" Age:"+age)

}

admin.close()

spark.stop()

}

}

Spark DataFrame 通過 Phoenix 讀寫 HBase

需要添加的依賴如下:

org.apache.phoenix

phoenix-core

${phoenix.version}

org.apache.phoenix

phoenix-spark

${phoenix.version}

下面老規矩,直接上代碼。

package com.ai.spark

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.{SaveMode, SparkSession}

/**

* Created by blockchain on 18-9-9 下午8:33 in Beijing.

*/

object SparkHBaseDataFrame {

def main(args: Array[String]) {

// 屏蔽不必要的日志顯示在終端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()

val url = s"jdbc:phoenix:localhost:2181"

val dbtable = "PHOENIXTEST"

//spark 讀取 phoenix 返回 DataFrame 的 第一種方式

val rdf = spark.read

.format("jdbc")

.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")

.option("url", url)

.option("dbtable", dbtable)

.load()

rdf.printSchema()

//spark 讀取 phoenix 返回 DataFrame 的 第二種方式

val df = spark.read

.format("org.apache.phoenix.spark")

.options(Map("table" -> dbtable, "zkUrl" -> url))

.load()

df.printSchema()

//spark DataFrame 寫入 phoenix,需要先建好表

df.write

.format("org.apache.phoenix.spark")

.mode(SaveMode.Overwrite)

.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))

.save()

spark.stop()

}

}

參考鏈接:

總結

以上是生活随笔為你收集整理的spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 越南a级片 | 日韩欧美日韩 | 亚洲精品69 | 黄色大片一级 | 少妇天堂网 | 草草影院第一页 | 全黄毛片| 欧美a级在线观看 | 一级黄色免费片 | 高潮av| 中文字幕在线播放第一页 | 美女131爽爽爽做爰视频 | 久久久18禁一区二区三区精品 | 青青导航 | 国产网站免费在线观看 | 一级色网站 | 欧美三p | 成人免费在线观看av | 欧美性爱精品在线 | 欧美一二三四五区 | 国产免费久久 | 欧美精品一二区 | www.久久久久久久 | 日韩成人免费在线观看 | 亚洲v日韩v综合v精品v | 亚洲精品国 | 免费播放片大片 | www.黄色大片| 国产理论视频在线观看 | 91毛片在线观看 | 天天摸日日摸狠狠添 | 国产精品蜜臀 | 欧美激情精品久久久久久变态 | 在线观看亚洲区 | 欧美成人图区 | 在线免费不卡视频 | 亚洲乱码国产乱码精品精 | 激情视频在线播放 | 手机av资源 | 亚洲三级黄色片 | 天堂视频在线观看免费 | 自拍偷拍第二页 | 性做爰裸体按摩视频 | 麻豆视频在线免费观看 | 高清视频免费在线观看 | 波多野结衣在线看 | 粗大挺进潘金莲身体在线播放 | 人妻 校园 激情 另类 | 日本韩国视频 | 蜜臀av午夜精品 | 超碰免费在线观看 | 久久亚洲天堂 | 国产精品影音先锋 | 96亚洲精品久久久蜜桃 | 一道本一区二区 | 国产精品传媒视频 | 极品美女av| 五十路六十路七十路熟婆 | 深夜福利免费在线观看 | 吃奶在线观看 | 日韩精品视频在线 | 男女爽爽爽| 国产成人亚洲精品自产在线 | 丝袜美女av | 国产在线观看免费视频软件 | 奇米777第四色 | 国产又黄又爽视频 | 成年人在线免费 | 中文字幕一区二区在线视频 | 少妇一级淫免费放 | 亚洲熟妇av日韩熟妇在线 | 超碰网址 | 亚洲第三十七页 | 久久高清无码视频 | 亚洲成人偷拍 | 在线中文字幕亚洲 | 日韩综合 | 亚洲视频 欧美视频 | 国产高清在线观看视频 | 久操视频在线 | 精品一区欧美 | 成人免费影片 | 91玉足脚交嫩脚丫在线播放 | 337p亚洲欧洲色噜噜噜 | 一级特黄录像免费看 | 国产成人无码精品久久久久 | 亚洲无吗一区二区三区 | 日韩免费视频网站 | 不卡的av电影 | 亚洲人成7777 | 少妇太紧太爽又黄又硬又爽 | 草逼视频网站 | 你懂的亚洲 | 午夜伦理在线观看 | 色拍拍视频 | 熟睡人妻被讨厌的公侵犯 | 一区二区美女视频 | 九九久久网 | 一级激情视频 |