日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

HBase 1.x Coprocessor使用指南

發(fā)布時(shí)間:2024/1/23 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase 1.x Coprocessor使用指南 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

HBase 1.x Coprocessor使用指南

@(HBASE)[hbase]

  • HBase 1x Coprocessor使用指南
  • 一概述
      • 1起因Why HBase Coprocessor
      • 2靈感來(lái)源 Source of Inspration
      • 3細(xì)節(jié)剖析Implementation
        • 1觀察者Observer
        • 2終端Endpoint
  • 二Observer
    • 一示例
      • 1準(zhǔn)備類文件
      • 2部署方式一
        • 1修改配置增加協(xié)處理器類
        • 2將包含上述類的jar包入到hbase的lib目錄中
        • 3重啟hbase
      • 3部署方式二
        • 1將包含上述類的jar包放到某個(gè)hdfs路徑
        • 2創(chuàng)建表時(shí)指定coprocessor
      • 4使用coproccessor
        • 1在hbase shell中使用coprocessor
        • 2使用java API使用coprocessor
      • 5問(wèn)題
  • 三Endpoint
    • 零業(yè)務(wù)常景描述
    • 一準(zhǔn)備proto文件
    • 二使用protoc生成類文件
    • 三實(shí)現(xiàn)真實(shí)的服務(wù)
      • 1類的結(jié)構(gòu)
      • 2 getService
      • 3 startCoprocessorEnvironment env
      • 4stopCoprocessorEnvironment env
      • 5 getCountAndSum
    • 四部署coprocessor
    • 五客戶端使用coprocessor
    • 六運(yùn)行程序

HBase在0.92版本之后,提供了協(xié)處理器功能。
在之前介紹過(guò),HBase提供了過(guò)濾器,以減少?gòu)姆?wù)器返回客戶端的數(shù)據(jù)。而協(xié)處理器用于將部分處理工作交由RegionServer處理,而不是全部返回client再處理。

舉個(gè)例子,HBase的安全機(jī)制就是通過(guò)協(xié)處理器實(shí)現(xiàn)的。當(dāng)用戶向HBase發(fā)出一個(gè)讀寫(xiě)請(qǐng)求時(shí),HBase會(huì)首先觸發(fā)這個(gè)協(xié)處理器,它會(huì)在讀寫(xiě)操作前確認(rèn)用戶是否有這個(gè)權(quán)限。

一、概述

(以下概述性的內(nèi)容摘自網(wǎng)絡(luò))。

1、起因(Why HBase Coprocessor)

HBase作為列族數(shù)據(jù)庫(kù)最經(jīng)常被人詬病的特性包括:無(wú)法輕易建立“二級(jí)索引”,難以執(zhí)行求和、計(jì)數(shù)、排序等操作。比如,在舊版本的(<0.92)Hbase中,統(tǒng)計(jì)數(shù)據(jù)表的總行數(shù),需要使用Counter方法,執(zhí)行一次MapReduce Job才能得到。雖然HBase在數(shù)據(jù)存儲(chǔ)層中集成了MapReduce,能夠有效用于數(shù)據(jù)表的分布式計(jì)算。然而在很多情況下,做一些簡(jiǎn)單的相加或者聚合計(jì)算的時(shí)候,如果直接將計(jì)算過(guò)程放置在server端,能夠減少通訊開(kāi)銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協(xié)處理器(coprocessors),實(shí)現(xiàn)一些激動(dòng)人心的新特性:能夠輕易建立二次索引、復(fù)雜過(guò)濾器(謂詞下推)以及訪問(wèn)控制等。

2、靈感來(lái)源( Source of Inspration)

HBase協(xié)處理器的靈感來(lái)自于Jeff Dean 09年的演講( P66-67)。它根據(jù)該演講實(shí)現(xiàn)了類似于bigtable的協(xié)處理器,包括以下特性:

  • 每個(gè)表服務(wù)器的任意子表都可以運(yùn)行代碼
  • 客戶端的高層調(diào)用接口(客戶端能夠直接訪問(wèn)數(shù)據(jù)表的行地址,多行讀寫(xiě)會(huì)自動(dòng)分片成多個(gè)并行的RPC調(diào)用)
  • 提供一個(gè)非常靈活的、可用于建立分布式服務(wù)的數(shù)據(jù)模型
  • 能夠自動(dòng)化擴(kuò)展、負(fù)載均衡、應(yīng)用請(qǐng)求路由

HBase的協(xié)處理器靈感來(lái)自bigtable,但是實(shí)現(xiàn)細(xì)節(jié)不盡相同。HBase建立了一個(gè)框架,它為用戶提供類庫(kù)和運(yùn)行時(shí)環(huán)境,使得他們的代碼能夠在HBase region server和master上處理。

3、細(xì)節(jié)剖析(Implementation)

協(xié)處理器分兩種類型,系統(tǒng)協(xié)處理器可以全局導(dǎo)入region server上的所有數(shù)據(jù)表,表協(xié)處理器即是用戶可以指定一張表使用協(xié)處理器。協(xié)處理器框架為了更好支持其行為的靈活性,提供了兩個(gè)不同方面的插件。一個(gè)是觀察者(observer),類似于關(guān)系數(shù)據(jù)庫(kù)的觸發(fā)器。另一個(gè)是終端(endpoint),動(dòng)態(tài)的終端有點(diǎn)像存儲(chǔ)過(guò)程。

3.1觀察者(Observer)

觀察者的設(shè)計(jì)意圖是允許用戶通過(guò)插入代碼來(lái)重載協(xié)處理器框架的upcall方法,而具體的事件觸發(fā)的callback方法由HBase的核心代碼來(lái)執(zhí)行。協(xié)處理器框架處理所有的callback調(diào)用細(xì)節(jié),協(xié)處理器自身只需要插入添加或者改變的功能。

HBase 提供了三種觀察者接口:

  • RegionObserver:提供客戶端的數(shù)據(jù)操縱事件鉤子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相關(guān)操作鉤子。
  • MasterObserver:提供DDL-類型的操作鉤子。如創(chuàng)建、刪除、修改數(shù)據(jù)表等。

這些接口可以同時(shí)使用在同一個(gè)地方,按照不同優(yōu)先級(jí)順序執(zhí)行.用戶可以任意基于協(xié)處理器實(shí)現(xiàn)復(fù)雜的HBase功能層。HBase有很多種事件可以觸發(fā)觀察者方法,這些事件與方法從HBase0.92版本起,都會(huì)集成在HBase API中。不過(guò)這些API可能會(huì)由于各種原因有所改動(dòng),不同版本的接口改動(dòng)比較大,具體參考Java Doc。

3.2終端(Endpoint)

終端是動(dòng)態(tài)RPC插件的接口,它的實(shí)現(xiàn)代碼被安裝在服務(wù)器端,從而能夠通過(guò)HBase RPC喚醒。客戶端類庫(kù)提供了非常方便的方法來(lái)調(diào)用這些動(dòng)態(tài)接口,它們可以在任意時(shí)候調(diào)用一個(gè)終端,它們的實(shí)現(xiàn)代碼會(huì)被目標(biāo)region遠(yuǎn)程執(zhí)行,結(jié)果會(huì)返回到終端。用戶可以結(jié)合使用這些強(qiáng)大的插件接口,為HBase添加全新的特性。
具體使用方法參考下面。

二、Observer

完整代碼請(qǐng)見(jiàn):https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-hbase/src/main/java/com/lujinhong/commons/hbase/coprocessor

(一)示例

1、準(zhǔn)備類文件

這個(gè)例子的coprocessor是一個(gè)RegionObserver,它判斷如果請(qǐng)求的rowkey是@@@GETTIME@@@,則返回系統(tǒng)當(dāng)前時(shí)間,然后不再請(qǐng)求region讀取實(shí)際的數(shù)據(jù)(e.bypass()),否則,有可能返回2行。
幅使用的是preGetOp()方法,因此所有的Get操作都會(huì)先經(jīng)過(guò)這個(gè)Coprocessor處理。

public class CoprocessorDemo extends BaseRegionObserver{public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");@Overridepublic void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)throws IOException {if(Bytes.equals(get.getRow(), FIXED_ROW)){KeyValue kv = new KeyValue(get.getRow(),FIXED_ROW,FIXED_ROW,Bytes.toBytes(System.currentTimeMillis()) );results.add(kv);e.bypass();}}}

2、部署方式一

這種方法適用于集群管理人員使用,所部署的coprocessor會(huì)影響所有表,所有region。

(1)修改配置,增加協(xié)處理器類

<property><name>hbase.coprocessor.region.classes</name><value>org.apache.hadoop.hbase.security.token.TokenProvider,org.apache.hadoop.hbase.security.access.AccessController,com.lujinhong.commons.hbase.coprocessor.CoprocessorDemo</value></property>

這里也可以看出來(lái),安全相關(guān)的控制都使用協(xié)處理器完成的。

(2)將包含上述類的jar包入到hbase的lib目錄中

(3)重啟hbase

bin/rolling-restart.sh

3、部署方式二

這種方式適合開(kāi)發(fā)人員使用,只會(huì)影響特寫(xiě)的表或者region。這種方式有可能導(dǎo)致開(kāi)發(fā)人員濫用coprocessor,從而使得hbase集群負(fù)載過(guò)高,因此建議回收建表權(quán)限,只能由集群管理人員建表,并在建表時(shí)指定coprocessor。
這種方式無(wú)須重啟集群,從而達(dá)到熱加載的目的。

(1)將包含上述類的jar包放到某個(gè)hdfs路徑

如/hbase/userlib。當(dāng)然也可以直接放在本地目錄,但要保證每臺(tái)hbase服務(wù)器都有這個(gè)類。

(2)創(chuàng)建表時(shí),指定coprocessor

create 'ljhtest', 'f1' disable 'ljhtest' alter 'ljhtest', 'Coprocessor'=>'hdfs://testing/hbase/userlib/gdc-commons-hbase-0.1-SNAPSHOT.jar|com.lujinhong.commons.hbase.coprocessor.CoprocessorDemo|1073741825|arg1=1' enable 'ljhtest'

注意jar包的權(quán)限,如果hbase用戶不能讀取這個(gè)jar包,會(huì)導(dǎo)致enable時(shí)失敗。

說(shuō)明文檔如下:

hbase> alter 't1','coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'Since you can have multiple coprocessors configured for a table, a sequence number will be automatically appended to the attribute name to uniquely identify it.The coprocessor attribute must match the pattern below in order for the framework to understand how to load the coprocessor classes:[coprocessor jar file location] | class name | [priority] | [arguments]

也可以在java代碼中通過(guò)HTableDescription來(lái)指定coprocessor。

4、使用coproccessor

(1)在hbase shell中使用coprocessor

hbase(main):007:0* get 'ljhtest3','@@@GETTIME@@@' COLUMN CELL@@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01T\xCE0=\x9E 1 row(s) in 0.1610 seconds

就會(huì)返回當(dāng)前時(shí)間。注意如果使用方式一部署的話,請(qǐng)求所有表均會(huì)返回正確結(jié)果,而使用方式二部署的話只有請(qǐng)求指定的表才會(huì)返回當(dāng)前時(shí)間。

(2)使用java API使用coprocessor

一樣的,沒(méi)有什么特寫(xiě),正常讀取即可。

5、問(wèn)題

在運(yùn)行coprocessor時(shí),若不失效,則到region所在的regionserver中查看日志,比如有些包沒(méi)打包上去等。

三、Endpoint

完整代碼請(qǐng)見(jiàn):https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-hbase/src/main/java/com/lujinhong/commons/hbase/coprocessor
除了本例以外,還可以參考hbase源代碼中的RowCountEndpoint。
注意,hbase 0.98對(duì)實(shí)現(xiàn)endpoint的API作了很大的調(diào)整,《hbase權(quán)威指南》等書(shū)的API均不能再使用。

創(chuàng)建一個(gè)Endpoint的基本流程可以歸納為:
(1)創(chuàng)建一個(gè)通信協(xié)議:準(zhǔn)備一個(gè)proto文件,然后使用protoc工具來(lái)生成協(xié)議類文件。這個(gè)文件需要在服務(wù)端及客戶端存在。
(2)創(chuàng)建一個(gè)Service類,實(shí)現(xiàn)具體的業(yè)務(wù)邏輯
(3)創(chuàng)建表時(shí)指定使用這個(gè)EndPoint,或者是全局配置。
(4)創(chuàng)建一個(gè)Client類,調(diào)用這個(gè)RPC方法。

(零)業(yè)務(wù)常景描述

HBase表中有一個(gè)family, 2相column,分別為f:c1, f:c2。rowkey為某個(gè)用戶id(當(dāng)然經(jīng)過(guò)hash以后以避免熱點(diǎn)),2個(gè)列分別表示這個(gè)用戶在2款產(chǎn)品的在線時(shí)間,單位為秒。如:

id1 column=f1:c1, timestamp=1464323601847, value=500000id1 column=f1:c2, timestamp=1464323601883, value=600000id2 column=f1:c1, timestamp=1464323648768, value=500id2 column=f1:c2, timestamp=1464323648758, value=600000 id3 column=f1:c1, timestamp=1464323648775, value=700000id3 column=f1:c2, timestamp=1464323648783, value=700id4 column=f1:c1, timestamp=1464324774802, value=700000id4 column=f1:c2, timestamp=1464324774845, value=800000

下面就基于這些數(shù)據(jù)來(lái)計(jì)算。

要求計(jì)算:
(1)2個(gè)產(chǎn)品的真實(shí)用戶有多少,定義為在線時(shí)長(zhǎng)超過(guò)10分鐘的
(2)這2個(gè)產(chǎn)品的真實(shí)用戶平均在線時(shí)長(zhǎng)是多少

協(xié)處理器的處理邏輯為:
(1)請(qǐng)求為這2個(gè)列名,萬(wàn)一上線新產(chǎn)品時(shí)可以直接使用,也可示范如何定義一個(gè)request。
(2)計(jì)算2個(gè)產(chǎn)品的真實(shí)用戶count1, count2
(3)計(jì)算這2個(gè)產(chǎn)品用戶分別的在線總時(shí)長(zhǎng)sum1, sum2
(4)將各個(gè)region的sum和count分別加起來(lái)后,計(jì)算2個(gè)產(chǎn)品的平均值。

(一)準(zhǔn)備proto文件

請(qǐng)求參數(shù)為列的名稱,用“;”分隔,返回的是這2個(gè)產(chǎn)品的真實(shí)用戶數(shù)與在線總時(shí)長(zhǎng)。

option java_package = "com.lujinhong.coprocessor"; option java_outer_classname = "MultiColumnSumProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED;message CountRequest {required string columns = 1; }message CountResponse {required int64 count1 = 1 [default = 0];required int64 count2 = 2 [default = 0];required int64 sum1 = 3 [default = 0];required int64 sum2 = 4 [default = 0]; }service RowCountService {rpc getCountAndSum(CountRequest)returns (CountResponse); }

(二)使用protoc生成類文件

protoc --java_out=../java/ MultiColumnSum.proto

這個(gè)命令在使用上面的proto文件生成相應(yīng)的類文件,這個(gè)類文件有幾個(gè)地方需要注意:
1、生成了一個(gè)CountRequest內(nèi)部類,表示請(qǐng)求信息
2、生成了一個(gè)CountResponse內(nèi)部類,表示返回信息
3、生成了一個(gè) RowCountService內(nèi)部類,表示所提供的服務(wù),這個(gè)類還有一個(gè)內(nèi)部接口,這個(gè)接口定義了 getCountAndSum()這個(gè)方法。
我們下面需要做的就是實(shí)現(xiàn)這個(gè)接口的這個(gè)方法,提供真正的服務(wù)。

(三)實(shí)現(xiàn)真實(shí)的服務(wù)

1、類的結(jié)構(gòu)

提供真實(shí)服務(wù)的類繼承自上面自動(dòng)生成的Server類,同時(shí)需要實(shí)現(xiàn)Coprocessor和CoprocessorService2個(gè)接口:

public class MultiColumnSum extends MultiColumnSumProtocol.RowCountService implements Coprocessor, CoprocessorService

它需要實(shí)現(xiàn)以下4個(gè)方法,下面我們逐一討論一下:

@Override public Service getService() { return null; }@Override public void start(CoprocessorEnvironment env) throws IOException { }@Override public void stop(CoprocessorEnvironment env) throws IOException { }@Override public void getCountAndSum(RpcController controller, CountRequest request, RpcCallback<CountResponse> done) { }

2、 getService()

這個(gè)方法直接返回自身即可。

@Override public Service getService() { return this; }

3、 start(CoprocessorEnvironment env)

這個(gè)方法會(huì)在coprocessor啟動(dòng)時(shí)調(diào)用,這里判斷了是否在一個(gè)region內(nèi)被使用,而不是master,WAL等環(huán)境下被調(diào)用。

@Override public void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region!");} }

4、stop(CoprocessorEnvironment env)

這個(gè)方法會(huì)在coprocessor完成時(shí)被調(diào)用,可用于關(guān)閉資源等,這里為空。

@Overridepublic void stop(CoprocessorEnvironment env) throws IOException { }

5、 getCountAndSum(…)

這是整個(gè)類的核心方法,用于實(shí)現(xiàn)真正的業(yè)務(wù)邏輯。關(guān)鍵的步驟有:
(1)根據(jù)request創(chuàng)建一個(gè)Scanner,然后使用它創(chuàng)建一個(gè) InternalScanner,可以更高效的進(jìn)行scan
(2)對(duì)掃描出來(lái)的行進(jìn)行分析處理,將結(jié)果保存在幾個(gè)變量中。
(3)調(diào)用response的各個(gè)set()方法,設(shè)置返回的結(jié)果。
(4)使用 done.run(response); 返回結(jié)果到客戶端。
這個(gè)方法的完整代碼如下:

@Override public void getCountAndSum(RpcController controller, CountRequest request, RpcCallback<CountResponse> done) {long[] values = { 0, 0, 0, 0 }; String columns = request.getColumns(); if (columns == null || "".equals(columns)) throw new NullPointerException("you need specify the columns"); String[] columnArray = columns.split(";"); Scan scan = new Scan(); for (String column : columnArray) { scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(column));} MultiColumnSumProtocol.CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<Cell>(); boolean hasMore = false; do { hasMore = scanner.next(results); if (results.size() < 2) continue; Cell kv0 = results.get(0); long value1 = Long.parseLong(Bytes.toString(CellUtil.cloneValue(kv0))); Cell kv1 = results.get(1); long value2 = Long.parseLong(Bytes.toString(CellUtil.cloneValue(kv1))); if(value1 > 60000){values[0] += 1;values[2] += value1;}if(value2 > 60000){values[1] += 1;values[3] += value2;}results.clear(); } while (hasMore); // 生成response response = MultiColumnSumProtocol.CountResponse.newBuilder().setCount1(values[0]).setCount2(values[1]).setSum1(values[2]).setSum2(values[3]).build(); } catch (IOException e) { e.printStackTrace(); ResponseConverter.setControllerException(controller, e); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) { } } } done.run(response); }

(四)部署coprocessor

將上述2個(gè)類進(jìn)行打包,然后按照上面Oberver部分介紹的部署方法來(lái)部署coprocessor。

(五)客戶端使用coprocessor

注意,如果很多代碼用到這個(gè)coprocessor,最好封裝成更方便調(diào)用的方式。

最核心的代碼是:

Map<byte[], ResponseInfo> map = table.coprocessorService(MultiColumnSumProtocol.RowCountService.class, null,null, new Batch.Call<MultiColumnSumProtocol.RowCountService, ResponseInfo>() {@Overridepublic ResponseInfo call(MultiColumnSumProtocol.RowCountService service) throws IOException {BlockingRpcCallback<MultiColumnSumProtocol.CountResponse> rpcCallback = new BlockingRpcCallback<>();service.getCountAndSum(null, request, rpcCallback);MultiColumnSumProtocol.CountResponse response = rpcCallback.get();ResponseInfo responseInfo = new ResponseInfo();responseInfo.count1 = response.getCount1();responseInfo.count2 = response.getCount2();responseInfo.sum1 = response.getSum1();responseInfo.sum2 = response.getSum2();return responseInfo;}});

將調(diào)用的結(jié)果返回保存在一個(gè)map中,每個(gè)region會(huì)產(chǎn)生一條數(shù)據(jù)。然后通過(guò)合并各個(gè)region的結(jié)果來(lái)得出最終的結(jié)果即可。

ResponseInfo result = new ResponseInfo(); for (ResponseInfo ri : map.values()) {result.count1 += ri.count1;result.count2 += ri.count2;result.sum1 += ri.sum1;result.sum2 += ri.sum2; }System.out.println("Produce 1 has " + result.count1 + " user, all online time is " + result.sum1 / 1000+ " minutes, average online time is " + result.sum1 / 1000 / result.count1 + "minutes.");System.out.println("Produce 2 has " + result.count2 + " user, all online time is " + result.sum2 / 1000+ " minutes, average online time is " + result.sum2 / 1000 / result.count2 + "minutes.");

(六)運(yùn)行程序

打包并上傳至集群,然后運(yùn)行程序。
注意只打包c(diǎn)lient類與protocol類,不要打包Service類。即部署到集群的jar包包括Service類和protocol類,而運(yùn)行任務(wù)的jar包包括client類與protocol類。

hadoop jar lujinhong-commons-hbase-0.1-SNAPSHOT.jar com.lujinhong.commons.hbase.coprocessor.MultiColumnSumClient

輸出結(jié)果為:

Produce 1 has 3 user, all online time is 1900 minutes, average online time is 633minutes. Produce 2 has 3 user, all online time is 2000 minutes, average online time is 666minutes.

總結(jié)

以上是生活随笔為你收集整理的HBase 1.x Coprocessor使用指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。