日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HBase读写的几种方式(三)flink篇

發(fā)布時間:2023/12/18 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase读写的几种方式(三)flink篇 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1. HBase連接的方式概況

主要分為:

  • 純Java?API讀寫HBase的方式;
  • Spark讀寫HBase的方式;
  • Flink讀寫HBase的方式;
  • HBase通過Phoenix讀寫的方式;
  • 第一種方式是HBase自身提供的比較原始的高效操作方式,而第二、第三則分別是Spark、Flink集成HBase的方式,最后一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中調(diào)用。

    注意:

    這里我們使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版本。

    2.?Flink Streaming和Flink DataSet讀寫HBase

    ?Flink上讀取HBase數(shù)據(jù)有兩種方式:

    • 繼承RichSourceFunction重寫父類方法(flink streaming)
    • 實現(xiàn)自定義TableInputFormat接口(flink?streaming和flink dataSet)

    Flink上將數(shù)據(jù)寫入HBase也有兩種方式:

    • 繼承RichSinkFunction重寫父類方法(flink streaming)
    • 實現(xiàn)OutputFormat接口(flink streaming和flink dataSet)

    注意:

    ① Flink Streaming流式處理有上述兩種方式;但是Flink DataSet批處理,讀只有“實現(xiàn)TableInputFormat接口”一種方式,寫只有”實現(xiàn)OutputFormat接口“一種方式。

    ②TableInputFormat接口是在flink-hbase-2.12-1.7.2里面的,而該jar包對應(yīng)的hbase版本是1.4.3,而項目中我們使用HBase2.1.2版本,故需要對TableInputFormat重寫。

    ? ?

    2.1?Flink讀取HBase的兩種方式

    注意:讀取HBase之前可以先執(zhí)行節(jié)點2.2.2實現(xiàn)OutputFormat接口:Flink?dataSet?批處理寫入HBase的方法,確保HBase?test表里面有數(shù)據(jù),數(shù)據(jù)如下:

    ??

    2.1.1 繼承RichSourceFunction重寫父類方法:

    package cn.swordfall.hbaseOnFlinkimport org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan, Table} import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConverters._ /*** @Author: Yang JianQiu* @Date: 2019/2/28 18:05** 以HBase為數(shù)據(jù)源* 從HBase中獲取數(shù)據(jù),然后以流的形式發(fā)射** 從HBase讀取數(shù)據(jù)* 第一種:繼承RichSourceFunction重寫父類方法*/ class HBaseReader extends RichSourceFunction[(String, String)]{private var conn: Connection = nullprivate var table: Table = nullprivate var scan: Scan = null/*** 在open方法使用HBase的客戶端連接* @param parameters*/override def open(parameters: Configuration): Unit = {val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)val tableName: TableName = TableName.valueOf("test")val cf1: String = "cf1"conn = ConnectionFactory.createConnection(config)table = conn.getTable(tableName)scan = new Scan()scan.withStartRow(Bytes.toBytes("100"))scan.withStopRow(Bytes.toBytes("107"))scan.addFamily(Bytes.toBytes(cf1))}/*** run方法來自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 無法便捷獲取到該方法,直接override會提示* @param sourceContext*/override def run(sourceContext: SourceContext[(String, String)]): Unit = {val rs = table.getScanner(scan)val iterator = rs.iterator()while (iterator.hasNext){val result = iterator.next()val rowKey = Bytes.toString(result.getRow)val sb: StringBuffer = new StringBuffer()for (cell:Cell <- result.listCells().asScala){val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)sb.append(value).append("_")}val valueString = sb.replace(sb.length() - 1, sb.length(), "").toStringsourceContext.collect((rowKey, valueString))}}/*** 必須添加*/override def cancel(): Unit = {}/*** 關(guān)閉hbase的連接,關(guān)閉table表*/override def close(): Unit = {try {if (table != null) {table.close()}if (conn != null) {conn.close()}} catch {case e:Exception => println(e.getMessage)}} }

    調(diào)用繼承RichSourceFunction的HBaseReader類,Flink Streaming流式處理的方式:

    /*** 從HBase讀取數(shù)據(jù)* 第一種:繼承RichSourceFunction重寫父類方法*/def readFromHBaseWithRichSourceFunction(): Unit ={val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(5000)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)val dataStream: DataStream[(String, String)] = env.addSource(new HBaseReader)dataStream.map(x => println(x._1 + " " + x._2))env.execute()}

    2.1.2 實現(xiàn)自定義的TableInputFormat接口:

    由于版本不匹配,這里我們需要對flink-hbase-2.12-1.7.2里面的三個文件進(jìn)行重寫,分別是TableInputSplit、AbstractTableInputFormat、TableInputFormat

    TableInputSplit重寫為CustomTableInputSplit:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.core.io.LocatableInputSplit;/*** @Author: Yang JianQiu* @Date: 2019/3/19 11:50*/ public class CustomTableInputSplit extends LocatableInputSplit {private static final long serialVersionUID = 1L;/** The name of the table to retrieve data from. */private final byte[] tableName;/** The start row of the split. */private final byte[] startRow;/** The end row of the split. */private final byte[] endRow;/*** Creates a new table input split.** @param splitNumber* the number of the input split* @param hostnames* the names of the hosts storing the data the input split refers to* @param tableName* the name of the table to retrieve data from* @param startRow* the start row of the split* @param endRow* the end row of the split*/CustomTableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,final byte[] endRow) {super(splitNumber, hostnames);this.tableName = tableName;this.startRow = startRow;this.endRow = endRow;}/*** Returns the table name.** @return The table name.*/public byte[] getTableName() {return this.tableName;}/*** Returns the start row.** @return The start row.*/public byte[] getStartRow() {return this.startRow;}/*** Returns the end row.** @return The end row.*/public byte[] getEndRow() {return this.endRow;} }

    AbstractTableInputFormat重寫為CustomeAbstractTableInputFormat:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.addons.hbase.AbstractTableInputFormat; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.util.ArrayList; import java.util.List;/*** @Author: Yang JianQiu* @Date: 2019/3/19 11:16** 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現(xiàn)在用到的是hbase2.1.2,版本不匹配* 故需要重寫flink-hbase_2.12_1.7.2里面的AbstractTableInputFormat,主要原因是AbstractTableInputFormat里面調(diào)用的是hbase1.4.3版本的api,* 而新版本hbase2.1.2已經(jīng)去掉某些api*/ public abstract class CustomAbstractTableInputFormat<T> extends RichInputFormat<T, CustomTableInputSplit> {protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);// helper variable to decide whether the input is exhausted or notprotected boolean endReached = false;protected transient HTable table = null;protected transient Scan scan = null;/** HBase iterator wrapper. */protected ResultScanner resultScanner = null;protected byte[] currentRow;protected long scannedRows;/*** Returns an instance of Scan that retrieves the required subset of records from the HBase table.** @return The appropriate instance of Scan for this use case.*/protected abstract Scan getScanner();/*** What table is to be read.** <p>Per instance of a TableInputFormat derivative only a single table name is possible.** @return The name of the table*/protected abstract String getTableName();/*** HBase returns an instance of {@link Result}.** <p>This method maps the returned {@link Result} instance into the output type {@link T}.** @param r The Result instance from HBase that needs to be converted* @return The appropriate instance of {@link T} that contains the data of Result.*/protected abstract T mapResultToOutType(Result r);/*** Creates a {@link Scan} object and opens the {@link HTable} connection.** <p>These are opened here because they are needed in the createInputSplits* which is called before the openInputFormat method.** <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.** @param parameters The configuration that is to be used* @see Configuration*/@Overridepublic abstract void configure(Configuration parameters);@Overridepublic void open(CustomTableInputSplit split) throws IOException {if (table == null) {throw new IOException("The HBase table has not been opened! " +"This needs to be done in configure().");}if (scan == null) {throw new IOException("Scan has not been initialized! " +"This needs to be done in configure().");}if (split == null) {throw new IOException("Input split is null!");}logSplitInfo("opening", split);// set scan rangecurrentRow = split.getStartRow();/* scan.setStartRow(currentRow);scan.setStopRow(split.getEndRow());*/scan.withStartRow(currentRow);scan.withStopRow(split.getEndRow());resultScanner = table.getScanner(scan);endReached = false;scannedRows = 0;}@Overridepublic T nextRecord(T reuse) throws IOException {if (resultScanner == null) {throw new IOException("No table result scanner provided!");}try {Result res = resultScanner.next();if (res != null) {scannedRows++;currentRow = res.getRow();return mapResultToOutType(res);}} catch (Exception e) {resultScanner.close();//workaround for timeout on scanLOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);/*scan.setStartRow(currentRow);*/scan.withStartRow(currentRow);resultScanner = table.getScanner(scan);Result res = resultScanner.next();if (res != null) {scannedRows++;currentRow = res.getRow();return mapResultToOutType(res);}}endReached = true;return null;}private void logSplitInfo(String action, CustomTableInputSplit split) {int splitId = split.getSplitNumber();String splitStart = Bytes.toString(split.getStartRow());String splitEnd = Bytes.toString(split.getEndRow());String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;String[] hostnames = split.getHostnames();LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);}@Overridepublic boolean reachedEnd() throws IOException {return endReached;}@Overridepublic void close() throws IOException {LOG.info("Closing split (scanned {} rows)", scannedRows);currentRow = null;try {if (resultScanner != null) {resultScanner.close();}} finally {resultScanner = null;}}@Overridepublic void closeInputFormat() throws IOException {try {if (table != null) {table.close();}} finally {table = null;}}@Overridepublic CustomTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {if (table == null) {throw new IOException("The HBase table has not been opened! " +"This needs to be done in configure().");}if (scan == null) {throw new IOException("Scan has not been initialized! " +"This needs to be done in configure().");}// Get the starting and ending row keys for every region in the currently open tablefinal Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {throw new IOException("Expecting at least one region.");}final byte[] startRow = scan.getStartRow();final byte[] stopRow = scan.getStopRow();final boolean scanWithNoLowerBound = startRow.length == 0;final boolean scanWithNoUpperBound = stopRow.length == 0;final List<CustomTableInputSplit> splits = new ArrayList<CustomTableInputSplit>(minNumSplits);for (int i = 0; i < keys.getFirst().length; i++) {final byte[] startKey = keys.getFirst()[i];final byte[] endKey = keys.getSecond()[i];final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();// Test if the given region is to be included in the InputSplit while splitting the regions of a tableif (!includeRegionInScan(startKey, endKey)) {continue;}// Find the region on which the given row is being servedfinal String[] hosts = new String[]{regionLocation};// Determine if regions contains keys used by the scanboolean isLastRegion = endKey.length == 0;if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)&& !isLastRegion ? endKey : stopRow;int id = splits.size();final CustomTableInputSplit split = new CustomTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);splits.add(split);}}LOG.info("Created " + splits.size() + " splits");for (CustomTableInputSplit split : splits) {logSplitInfo("created", split);}return splits.toArray(new CustomTableInputSplit[splits.size()]);}/*** Test if the given region is to be included in the scan while splitting the regions of a table.** @param startKey Start key of the region* @param endKey End key of the region* @return true, if this region needs to be included as part of the input (default).*/protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {return true;}@Overridepublic InputSplitAssigner getInputSplitAssigner(CustomTableInputSplit[] inputSplits) {return new LocatableInputSplitAssigner(inputSplits);}@Overridepublic BaseStatistics getStatistics(BaseStatistics cachedStatistics) {return null;} }

    TableInputFormat重寫為CustomTableInputFormat:

    package cn.swordfall.hbaseOnFlink.flink172_hbase212;import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan;/*** @Author: Yang JianQiu* @Date: 2019/3/19 11:15* 由于flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現(xiàn)在用到的是hbase2.1.2,版本不匹配* 故需要重寫flink-hbase_2.12_1.7.2里面的TableInputFormat*/ public abstract class CustomTableInputFormat<T extends Tuple> extends CustomAbstractTableInputFormat<T> {private static final long serialVersionUID = 1L;/*** Returns an instance of Scan that retrieves the required subset of records from the HBase table.* @return The appropriate instance of Scan for this usecase.*/@Overrideprotected abstract Scan getScanner();/*** What table is to be read.* Per instance of a TableInputFormat derivative only a single tablename is possible.* @return The name of the table*/@Overrideprotected abstract String getTableName();/*** The output from HBase is always an instance of {@link Result}.* This method is to copy the data in the Result instance into the required {@link Tuple}* @param r The Result instance from HBase that needs to be converted* @return The appropriate instance of {@link Tuple} that contains the needed information.*/protected abstract T mapResultToTuple(Result r);/*** Creates a {@link Scan} object and opens the {@link HTable} connection.* These are opened here because they are needed in the createInputSplits* which is called before the openInputFormat method.* So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.** @param parameters The configuration that is to be used* @see Configuration*/@Overridepublic void configure(Configuration parameters) {table = createTable();if (table != null) {scan = getScanner();}}/*** Create an {@link HTable} instance and set it into this format.*/private HTable createTable() {LOG.info("Initializing HBaseConfiguration");//use files found in the classpathorg.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();try {return null;} catch (Exception e) {LOG.error("Error instantiating a new HTable instance", e);}return null;}@Overrideprotected T mapResultToOutType(Result r) {return mapResultToTuple(r);} }

    繼承自定義的CustomTableInputFormat,進(jìn)行hbase連接、讀取操作:

    package cn.swordfall.hbaseOnFlinkimport java.io.IOExceptionimport cn.swordfall.hbaseOnFlink.flink172_hbase212.CustomTableInputFormat import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytesimport scala.collection.JavaConverters._ /*** @Author: Yang JianQiu* @Date: 2019/3/1 1:14** 從HBase讀取數(shù)據(jù)* 第二種:實現(xiàn)TableInputFormat接口*/ class HBaseInputFormat extends CustomTableInputFormat[Tuple2[String, String]]{// 結(jié)果Tupleval tuple2 = new Tuple2[String, String]/*** 建立HBase連接* @param parameters*/override def configure(parameters: Configuration): Unit = {val tableName: TableName = TableName.valueOf("test")val cf1 = "cf1"var conn: Connection = nullval config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.createconfig.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)try {conn = ConnectionFactory.createConnection(config)table = conn.getTable(tableName).asInstanceOf[HTable]scan = new Scan()scan.withStartRow(Bytes.toBytes("001"))scan.withStopRow(Bytes.toBytes("201"))scan.addFamily(Bytes.toBytes(cf1))} catch {case e: IOException =>e.printStackTrace()}}/*** 對獲取的數(shù)據(jù)進(jìn)行加工處理* @param result* @return*/override def mapResultToTuple(result: Result): Tuple2[String, String] = {val rowKey = Bytes.toString(result.getRow)val sb = new StringBuffer()for (cell: Cell <- result.listCells().asScala){val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)sb.append(value).append("_")}val value = sb.replace(sb.length() - 1, sb.length(), "").toStringtuple2.setField(rowKey, 0)tuple2.setField(value, 1)tuple2}/*** tableName* @return*/override def getTableName: String = "test"/*** 獲取Scan* @return*/override def getScanner: Scan = {scan}}

    調(diào)用實現(xiàn)CustomTableInputFormat接口的類HBaseInputFormat,Flink Streaming流式處理的方式:

    /*** 從HBase讀取數(shù)據(jù)* 第二種:實現(xiàn)TableInputFormat接口*/def readFromHBaseWithTableInputFormat(): Unit ={val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(5000)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)val dataStream = env.createInput(new HBaseInputFormat)dataStream.filter(_.f0.startsWith("10")).print()env.execute()}

    而Flink DataSet批處理的方式為:

    /*** 讀取HBase數(shù)據(jù)方式:實現(xiàn)TableInputFormat接口*/def readFromHBaseWithTableInputFormat(): Unit ={val env = ExecutionEnvironment.getExecutionEnvironmentval dataStream = env.createInput(new HBaseInputFormat)dataStream.filter(_.f1.startsWith("20")).print()}

    2.2 Flink寫入HBase的兩種方式

    這里Flink Streaming寫入HBase,需要從Kafka接收數(shù)據(jù),可以開啟kafka單機(jī)版,利用kafka-console-producer.sh往topic "test"寫入如下數(shù)據(jù):

    100,hello,20 101,nice,24 102,beautiful,26

    2.2.1 繼承RichSinkFunction重寫父類方法:

    package cn.swordfall.hbaseOnFlinkimport org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes/*** @Author: Yang JianQiu* @Date: 2019/3/1 1:34** 寫入HBase* 第一種:繼承RichSinkFunction重寫父類方法** 注意:由于flink是一條一條的處理數(shù)據(jù),所以我們在插入hbase的時候不能來一條flush下,* 不然會給hbase造成很大的壓力,而且會產(chǎn)生很多線程導(dǎo)致集群崩潰,所以線上任務(wù)必須控制flush的頻率。** 解決方案:我們可以在open方法中定義一個變量,然后在寫入hbase時比如500條flush一次,或者加入一個list,判斷l(xiāng)ist的大小滿足某個閥值flush一下*/ class HBaseWriter extends RichSinkFunction[String]{var conn: Connection = nullval scan: Scan = nullvar mutator: BufferedMutator = nullvar count = 0/*** 建立HBase連接* @param parameters*/override def open(parameters: Configuration): Unit = {val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.createconfig.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)conn = ConnectionFactory.createConnection(config)val tableName: TableName = TableName.valueOf("test")val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)//設(shè)置緩存1m,當(dāng)達(dá)到1m時數(shù)據(jù)會自動刷到hbaseparams.writeBufferSize(1024 * 1024) //設(shè)置緩存的大小mutator = conn.getBufferedMutator(params)count = 0}/*** 處理獲取的hbase數(shù)據(jù)* @param value* @param context*/override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {val cf1 = "cf1"val array: Array[String] = value.split(",")val put: Put = new Put(Bytes.toBytes(array(0)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))mutator.mutate(put)//每滿2000條刷新一下數(shù)據(jù)if (count >= 2000){mutator.flush()count = 0}count = count + 1}/*** 關(guān)閉*/override def close(): Unit = {if (conn != null) conn.close()} }

    ?

    調(diào)用繼承RichSinkFunction的HBaseWriter類,Flink Streaming流式處理的方式:

    /*** 寫入HBase* 第一種:繼承RichSinkFunction重寫父類方法*/def write2HBaseWithRichSinkFunction(): Unit = {val topic = "test"val props = new Propertiesprops.put("bootstrap.servers", "192.168.187.201:9092")props.put("group.id", "kv_flink")props.put("enable.auto.commit", "true")props.put("auto.commit.interval.ms", "1000")props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(5000)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)val dataStream: DataStream[String] = env.addSource(myConsumer)//寫入HBasedataStream.addSink(new HBaseWriter)env.execute()}

    2.2.2 實現(xiàn)OutputFormat接口:

    package cn.swordfall.hbaseOnFlinkimport org.apache.flink.api.common.io.OutputFormat import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes/*** @Author: Yang JianQiu* @Date: 2019/3/1 1:40** 寫入HBase提供兩種方式* 第二種:實現(xiàn)OutputFormat接口*/ class HBaseOutputFormat extends OutputFormat[String]{val zkServer = "192.168.187.201"val port = "2181"var conn: Connection = nullvar mutator: BufferedMutator = nullvar count = 0/*** 配置輸出格式。此方法總是在實例化輸出格式上首先調(diào)用的** @param configuration*/override def configure(configuration: Configuration): Unit = {}/*** 用于打開輸出格式的并行實例,所以在open方法中我們會進(jìn)行hbase的連接,配置,建表等操作。** @param i* @param i1*/override def open(i: Int, i1: Int): Unit = {val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.createconfig.set(HConstants.ZOOKEEPER_QUORUM, zkServer)config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)conn = ConnectionFactory.createConnection(config)val tableName: TableName = TableName.valueOf("test")val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)//設(shè)置緩存1m,當(dāng)達(dá)到1m時數(shù)據(jù)會自動刷到hbaseparams.writeBufferSize(1024 * 1024) //設(shè)置緩存的大小mutator = conn.getBufferedMutator(params)count = 0}/*** 用于將數(shù)據(jù)寫入數(shù)據(jù)源,所以我們會在這個方法中調(diào)用寫入hbase的API** @param it*/override def writeRecord(it: String): Unit = {val cf1 = "cf1"val array: Array[String] = it.split(",")val put: Put = new Put(Bytes.toBytes(array(0)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))mutator.mutate(put)//每4條刷新一下數(shù)據(jù),如果是批處理調(diào)用outputFormat,這里填寫的4必須不能大于批處理的記錄總數(shù)量,否則數(shù)據(jù)不會更新到hbase里面if (count >= 4){mutator.flush()count = 0}count = count + 1}/*** 關(guān)閉*/override def close(): Unit = {try {if (conn != null) conn.close()} catch {case e: Exception => println(e.getMessage)}} }

    調(diào)用實現(xiàn)OutputFormat的HBaseOutputFormat類,Flink Streaming流式處理的方式:

    /*** 寫入HBase* 第二種:實現(xiàn)OutputFormat接口*/def write2HBaseWithOutputFormat(): Unit = {val topic = "test"val props = new Propertiesprops.put("bootstrap.servers", "192.168.187.201:9092")props.put("group.id", "kv_flink")props.put("enable.auto.commit", "true")props.put("auto.commit.interval.ms", "1000")props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(5000)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)val dataStream: DataStream[String] = env.addSource(myConsumer)dataStream.writeUsingOutputFormat(new HBaseOutputFormat)env.execute()}

    而Flink DataSet批處理的方式為:

    /*** 寫入HBase方式:實現(xiàn)OutputFormat接口*/def write2HBaseWithOutputFormat(): Unit = {val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.定義數(shù)據(jù)val dataSet: DataSet[String] = env.fromElements("103,zhangsan,20", "104,lisi,21", "105,wangwu,22", "106,zhaolilu,23")dataSet.output(new HBaseOutputFormat)//運(yùn)行下面這句話,程序才會真正執(zhí)行,這句代碼針對的是data sinks寫入數(shù)據(jù)的 env.execute()}

    注意:

      如果是批處理調(diào)用的,應(yīng)該要注意HBaseOutputFormat類的writeRecord方法每次批量刷新的數(shù)據(jù)量不能大于批處理的總記錄數(shù)據(jù)量,否則數(shù)據(jù)更新不到hbase里面。

    3. 總結(jié)

    【其他相關(guān)文章】

    HBase連接的幾種方式(一)java篇? 查看純Java API讀寫HBase

    HBase連接的幾種方式(二)spark篇?查看Spark上讀寫HBase

    github地址:

    https://github.com/SwordfallYeung/HBaseDemo(flink讀寫hbase包括java和scala兩個版本的代碼)

    【參考資料】

    https://blog.csdn.net/liguohuabigdata/article/details/78588861

    ?https://blog.csdn.net/aA518189/article/details/86544844

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/swordfall/p/10527423.html

    總結(jié)

    以上是生活随笔為你收集整理的HBase读写的几种方式(三)flink篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。