spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)
使用 saveAsHadoopDataset 寫入數據
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* Created by blockchain on 18-9-9 下午3:45 in Beijing.
*/
object SparkHBaseRDD {
def main(args: Array[String]) {
// 屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()
val sc = spark.sparkContext
val tablename = "SparkHBase"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","localhost") //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //設置zookeeper連接端口,默認2181
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))
val rdd = indataRDD.map(_.split(',')).map{ arr=>
/*一個Put對象就是一行記錄,在構造方法中指定主鍵
* 所有插入的數據必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換
* Put.addColumn 方法接收三個參數:列族,列名,數據*/
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable, put)
}
rdd.saveAsHadoopDataset(jobConf)
spark.stop()
}
}
使用 newAPIHadoopRDD 讀取數據
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* Created by blockchain on 18-9-9 下午3:45 in Beijing.
*/
object SparkHBaseRDD {
def main(args: Array[String]) {
// 屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()
val sc = spark.sparkContext
val tablename = "SparkHBase"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","localhost") //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //設置zookeeper連接端口,默認2181
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
// 如果表不存在則創建表
val admin = new HBaseAdmin(hbaseConf)
if (!admin.isTableAvailable(tablename)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)
}
//讀取數據并轉化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.foreach{ case (_ ,result) =>
//獲取行鍵
val key = Bytes.toString(result.getRow)
//通過列族和列名獲取列
val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))
val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}
admin.close()
spark.stop()
}
}
Spark DataFrame 通過 Phoenix 讀寫 HBase
需要添加的依賴如下:
org.apache.phoenix
phoenix-core
${phoenix.version}
org.apache.phoenix
phoenix-spark
${phoenix.version}
下面老規矩,直接上代碼。
package com.ai.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Created by blockchain on 18-9-9 下午8:33 in Beijing.
*/
object SparkHBaseDataFrame {
def main(args: Array[String]) {
// 屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()
val url = s"jdbc:phoenix:localhost:2181"
val dbtable = "PHOENIXTEST"
//spark 讀取 phoenix 返回 DataFrame 的 第一種方式
val rdf = spark.read
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", url)
.option("dbtable", dbtable)
.load()
rdf.printSchema()
//spark 讀取 phoenix 返回 DataFrame 的 第二種方式
val df = spark.read
.format("org.apache.phoenix.spark")
.options(Map("table" -> dbtable, "zkUrl" -> url))
.load()
df.printSchema()
//spark DataFrame 寫入 phoenix,需要先建好表
df.write
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))
.save()
spark.stop()
}
}
參考鏈接:
總結
以上是生活随笔為你收集整理的spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 探索比特币源码5-私钥
- 下一篇: 外设驱动库开发笔记18:MS5837压力