日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hbase 协处理器 RegionObserver

發布時間:2024/8/23 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hbase 协处理器 RegionObserver 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

參考鏈接1:https://www.cnblogs.com/ios123/p/6370724.html

參考鏈接2:http://www.zhyea.com/2017/04/13/using-hbase-coprocessor.html

RegionObserver

注:每次更新協處理器方法,最好加上版本更新,否則可能會出現更新失敗

  • 協處理器安裝-表級別安裝
disable 'wechat_article' alter 'wechat_article' , METHOD =>'table_att','coprocessor'=>'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.6-SNAPSHOT.jar|com.izhonghong.hbase.coprocessor.WechatUserObserver|1001' enable 'wechat_article'
  • 卸載協處理器:
disable 'wechat_article' alter 'wechat_article', METHOD => 'table_att_unset', NAME => 'coprocessor$1' enable
  • 查看是否成功
hbase(main):002:0> desc 'wechat_article' Table wechat_article is ENABLED wechat_article, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.14-SNAPSHOT.jar|com.izhonghong.hbase.coproces sor.WechatUserObserver|1001'} COLUMN FAMILIES DESCRIPTION {NAME => 'fn', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 1 row(s) in 0.3030 seconds

代碼開發

代碼查看(RegionObserverDemo.java):http://note.youdao.com/noteshare?id=9e8b53139c00840d00308356dda0f203&sub=B241C0BD8E46423EBB48DDC285EC5BC2

  • prePut-插入前處理數據
插入數據前判斷download_type類型是否在200-300范圍中,如果在直接將用戶id插入到另外一個表中,并做關聯 @Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {LOG.warn("###########################################");// 獲取列簇為FAMAILLY_NAME,列名為DOWNLOAD_TYPE的數據List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(DOWNLOAD_TYPE));//判斷列名為DOWNLOAD_TYPE是否包含數據,不包含直接return,退出處理if (cells == null || cells.size() == 0) {LOG.warn("download_type 字段不存在退出過濾");return;}// 列名為DOWNLOAD_TYPE已包含數據,進行處理byte[] aValue = null;for (Cell cell : cells) {try {//DOWNLOAD_TYPE轉換為數字aValue = CellUtil.cloneValue(cell);Integer valueOf = Integer.valueOf(Bytes.toString(aValue));if(valueOf>=200 &&valueOf<=300) {//如果DOWNLOAD_TYPE范圍在200-300之間,獲取用戶UID信息List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(UID));//判斷用戶是否包含UIDif (list == null || list.size() == 0) {LOG.warn("用戶BIZ不存在,或者為空");return;}//獲取用戶UIDCell cell2 = list.get(0);LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));//將用戶UID設置為rowkey,原數據的rowkey當作列名,download_type當作列值Put put2 = new Put(CellUtil.cloneValue(cell2));put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);//插入數據到table中table.put(put2);table.close();}else {LOG.warn("Download type is not in range.");}} catch (Exception e1) {LOG.error("異常------->>>>>> "+e1.getMessage());return ;}}}
  • preGetOp-處理返回結果只對get有效
@Overridepublic void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,final Get get, final List<Cell> results) throws IOException { //判斷查詢的rowkey是否等于testif(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) { //新增返回數據fn:time,值為當前時間戳給客戶端KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));results.add(kv);}}
  • preScannerOpen-在客戶端打開新掃描儀之前過濾,此方法會覆蓋原有filter
@Overridepublic RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,final RegionScanner s) throws IOException {//查詢rowkey等于test的行進行過濾(顯示不等test的數據),會覆蓋原有filterFilter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));scan.setFilter(filter);return s;}
  • postScannerNext,對返回結果進行過濾
@Overridepublic boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,final List<Result> results, final int limit, final boolean hasMore) throws IOException {Result result = null;//獲取返回結果,如果rowkey等于test則過濾掉Iterator<Result> iterator = results.iterator();while (iterator.hasNext()) {result = iterator.next();if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {iterator.remove();break;}}return hasMore;}
  • preDelete,,刪除列前進行判斷該列是否可以刪除
@Overridepublic void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,final Delete delete, final WALEdit edit, final Durability durability)throws IOException {// 判斷是否對列簇FAMAILLY_NAME操作List<Cell> cells = delete.getFamilyCellMap().get(Bytes.toBytes(FAMAILLY_NAME));if (cells == null || cells.size() == 0) {//如果沒有對指定列簇操作則跳過判斷,直接執行語句return;}byte[] qualifierName = null;boolean aDeleteFlg = false;for (Cell cell : cells) {//獲取帶操作的列名qualifierName = CellUtil.cloneQualifier(cell);// 如果列名等于DOWNLOAD_TYPE ,則拋出異常。注://需查看該值在集群中配置多少,否則重試好幾百次性能會很差。//conf.set("hbase.client.retries.number", "1");//client失敗重試次數if (Arrays.equals(qualifierName, Bytes.toBytes(DOWNLOAD_TYPE))) {throw new IOException("You cannot delete read-only columns.");}// 檢查是否存在對UID列進行刪除if (Arrays.equals(qualifierName, Bytes.toBytes(UID))) {aDeleteFlg = true;}}// 如果對于UID列有刪除,則需要對DOWNLOAD_TYPE列也要刪除if (aDeleteFlg){delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(DOWNLOAD_TYPE));}} 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Hbase 协处理器 RegionObserver的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。