HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
1. Hbase高級應用
1.1建表高級屬性
下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性
1、 BLOOMFILTER 默認是NONE 是否使用布隆過慮及使用何種方式
布隆過濾可以每列族單獨啟用。
使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 對列族單獨啟用布隆。
? Default = ROW 對行進行布隆過濾。
? 對 ROW,行鍵的哈希在每次插入行時將被添加到布隆。
? 對 ROWCOL,行鍵 + 列族 + 列族修飾的哈希將在每次插入行時添加到布隆
使用方法: create ‘table’,{BLOOMFILTER =>’ROW’}
啟用布隆過濾可以節省讀磁盤過程,可以有助于降低讀取延遲
2、 VERSIONS 默認是1 這個參數的意思是數據保留1個 版本,如果我們認為我們的數據沒有這么大的必要保留這么多,隨時都在更新,而老版本的數據對我們毫無價值,那將此參數設為1 能節約2/3的空間
使用方法: create ‘table’,{VERSIONS=>’2’}
附:MIN_VERSIONS => ‘0’是說在compact操作執行之后,至少要保留的版本
3、 COMPRESSION 默認值是NONE 即不使用壓縮
這個參數意思是該列族是否采用壓縮,采用什么壓縮算法
使用方法: create ‘table’,{NAME=>’info’,COMPRESSION=>’SNAPPY’}
建議采用SNAPPY壓縮算法
HBase中,在Snappy發布之前(Google 2011年對外發布Snappy),采用的LZO算法,目標是達到盡可能快的壓縮和解壓速度,同時減少對CPU的消耗;
在Snappy發布之后,建議采用Snappy算法(參考《HBase: The Definitive Guide》),具體可以根據實際情況對LZO和Snappy做過更詳細的對比測試后再做選擇。
| GZIP | 13.4% | 21 MB/s | 118 MB/s |
| LZO | 20.5% | 135 MB/s | 410 MB/s |
| Zippy/Snappy | 22.2% | 172 MB/s | 409 MB/s |
如果建表之初沒有壓縮,后來想要加入壓縮算法,可以通過alter修改schema
4. alter
使用方法:
如 修改壓縮算法
disable ‘table’
alter ‘table’,{NAME=>’info’,COMPRESSION=>’snappy’}
enable ‘table’
但是需要執行major_compact ‘table’ 命令之后 才會做實際的操作。
5. TTL
默認是 2147483647 即:Integer.MAX_VALUE 值大概是68年
這個參數是說明該列族數據的存活時間,單位是s
這個參數可以根據具體的需求對數據設定存活時間,超過存過時間的數據將在表中不在顯示,待下次major compact的時候再徹底刪除數據
注意的是TTL設定之后 MIN_VERSIONS=>’0’ 這樣設置之后,TTL時間戳過期后,將全部徹底刪除該family下所有的數據,如果MIN_VERSIONS 不等于0那將保留最新的MIN_VERSIONS個版本的數據,其它的全部刪除,比如MIN_VERSIONS=>’1’ 屆時將保留一個最新版本的數據,其它版本的數據將不再保存。
6. describe ‘table’ 這個命令查看了create table 的各項參數或者是默認值。
7. disable_all ‘toplist.*’ disable_all 支持正則表達式,并列出當前匹配的表的如下:
toplist_a_total_1001
toplist_a_total_1002
toplist_a_total_1008
toplist_a_total_1009
toplist_a_total_1019
toplist_a_total_1035
…
Disable the above 25 tables (y/n)? 并給出確認提示
8. drop_all 這個命令和disable_all的使用方式是一樣的
9. hbase 表預分區—-手動分區
默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。
命令方式:
create ‘t1’, ‘f1’, {NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’}
也可以使用api的方式:
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info
參數:
test_table是表名
HexStringSplit 是split 方式
-c 是分10個region
-f 是family
可在UI上查看結果,如圖:
這樣就可以將表預先分為15個區,減少數據達到storefile 大小的時候自動分區的時間消耗,并且還有以一個優勢,就是合理設計rowkey 能讓各個region 的并發請求平均分配(趨于均勻) 使IO 效率達到最高,但是預分區需要將filesize 設置一個較大的值,設置哪個參數呢 hbase.hregion.max.filesize 這個值默認是10G 也就是說單個region 默認大小是10G
這個參數的默認值在0.90 到0.92到0.94.3各版本的變化:256M–1G–10G
1.2 hbase應用案例看行鍵設計
表結構設計
1、列族數量的設定
以用戶信息為例,可以將必須的基本信息存放在一個列族,而一些附加的額外信息可以放在另一列族;
2、行鍵的設計
語音詳單:
13877889988-20150625
13877889988-20150625
13877889988-20150626
13877889988-20150626
13877889989
13877889989
13877889989
—-將需要批量查詢的數據盡可能連續存放
CMS系統—-多條件查詢
盡可能將查詢條件關鍵詞拼裝到rowkey中,查詢頻率最高的條件盡量往前靠
20150230-zhangsan-category…
20150230-lisi-category…
(每一個條件的值長度不同,可以通過做定長映射來提高效率)
參考:《hbase 實戰》—-詳細講述了facebook /GIS等系統的表結構設計
1.3 Hbase和mapreduce結合
為什么需要用mapreduce去訪問hbase的數據?
——加快分析速度和擴展分析能力
Mapreduce訪問hbase數據作分析一定是在離線分析的場景下應用
1.3.1 從Hbase中讀取數據、分析,寫入hdfs
/** public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { }* @author duanhaitao@itcast.cn**/ public class HbaseReader {public static String flow_fields_import = "flow_fields_import";static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {byte[] bytes = key.copyBytes();String phone = new String(bytes);byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());String url = new String(urlbytes);context.write(new Text(phone + "\t" + url), NullWritable.get());}}static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "spark01");Job job = Job.getInstance(conf);job.setJarByClass(HbaseReader.class);// job.setMapperClass(HdfsSinkMapper.class);Scan scan = new Scan();TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);job.setReducerClass(HdfsSinkReducer.class);FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.waitForCompletion(true);}}1.3.2 從hdfs中讀取數據寫入Hbase
/** public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> { }* @author duanhaitao@itcast.cn**/ public class HbaseSinker {public static String flow_fields_import = "flow_fields_import";static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");String phone = fields[0];String url = fields[1];FlowBean bean = new FlowBean(phone,url);context.write(bean, NullWritable.get());}}static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{@Overrideprotected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {Put put = new Put(key.getPhone().getBytes());put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "spark01");HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);if(tableExists){hBaseAdmin.disableTable(flow_fields_import);hBaseAdmin.deleteTable(flow_fields_import);}HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());desc.addFamily(hColumnDescriptor);hBaseAdmin.createTable(desc);Job job = Job.getInstance(conf);job.setJarByClass(HbaseSinker.class);job.setMapperClass(HbaseSinkMrMapper.class);TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(ImmutableBytesWritable.class);job.setOutputValueClass(Mutation.class);job.waitForCompletion(true); } }1.3 hbase高級編程
1.3.1 協處理器—- Coprocessor
協處理器有兩種:observer和endpoint
Observer允許集群在正常的客戶端操作過程中可以有不同的行為表現
Endpoint允許擴展集群的能力,對客戶端應用開放新的運算命令
? Observer協處理器
? 正常put請求的流程:
? 加入Observer協處理后的put流程:
1 客戶端發出put請求
2 該請求被分派給合適的RegionServer和region
3 coprocessorHost攔截該請求,然后在該表上登記的每個RegionObserver上調用prePut()
4 如果沒有被prePut()攔截,該請求繼續送到region,然后進行處理
5 region產生的結果再次被CoprocessorHost攔截,調用postPut()
6 假如沒有postPut()攔截該響應,最終結果被返回給客戶端
? Observer的類型
1.RegionObs——這種Observer鉤在數據訪問和操作階段,所有標準的數據操作命令都可以被pre-hooks和post-hooks攔截
**2.WALObserver——**WAL所支持的Observer;可用的鉤子是pre-WAL和post-WAL
3.MasterObserver——鉤住DDL事件,如表創建或模式修改
? Observer應用場景示例
見下節;
? Endpoint—參考《Hbase 權威指南》
1.3.2 二級索引
row key在HBase中是以B+ tree結構化有序存儲的,所以scan起來會比較效率。單表以row key存儲索引,column value存儲id值或其他數據 ,這就是Hbase索引表的結構。
由于HBase本身沒有二級索引(Secondary Index)機制,基于索引檢索數據只能單純地依靠RowKey,為了能支持多條件查詢,開發者需要將所有可能作為查詢條件的字段一一拼接到RowKey中,這是HBase開發中極為常見的做法
比如,現在有一張1億的用戶信息表,建有出生地和年齡兩個索引,我想得到一個條件是在杭州出生,年齡為20歲的按用戶id正序排列前10個的用戶列表。
有一種方案是,系統先掃描出生地為杭州的索引,得到一個用戶id結果集,這個集合的規模假設是10萬。然后掃描年齡,規模是5萬,最后merge這些用戶id,去重,排序得到結果。
這明顯有問題,如何改良?
保證出生地和年齡的結果是排過序的,可以減少merge的數據量?但Hbase是按row key排序,value是不能排序的。
變通一下——將用戶id冗余到row key里?OK,這是一種解決方案了,這個方案的圖示如下:
merge時提取交集就是所需要的列表,順序是靠索引增加了_id,以字典序保證的。
2, 按索引查詢種類建立組合索引。
在方案1的場景中,想象一下,如果單索引數量多達10個會怎么樣?10個索引,就要merge 10次,性能可想而知。
解決這個問題需要參考RDBMS的組合索引實現。
比如出生地和年齡需要同時查詢,此時如果建立一個出生地和年齡的組合索引,查詢時效率會高出merge很多。
當然,這個索引也需要冗余用戶id,目的是讓結果自然有序。結構圖示如下:
這個方案的優點是查詢速度非常快,根據查詢條件,只需要到一張表中檢索即可得到結果list。缺點是如果有多個索引,就要建立多個與查詢條件一一對應的組合索引
而索引表的維護如果交給應用客戶端,則無疑增加了應用端開發的負擔
通過協處理器可以將索引表維護的工作從應用端剝離
? 利用Observer自動維護索引表示例
在社交類應用中,經常需要快速檢索各用戶的關注列表t_guanzhu,同時,又需要反向檢索各種戶的粉絲列表t_fensi,為了實現這個需求,最佳實踐是建立兩張互為反向的表:
? 一個表為正向索引關注表 “t_guanzhu”:
Rowkey: A-B
f1:From
f1:To
? 另一個表為反向索引粉絲表:“t_fensi”:
Rowkey: B—A
f1:From
f1:To
插入一條關注信息時,為了減輕應用端維護反向索引表的負擔,可用Observer協處理器實現:
1、編寫自定義RegionServer
public class InverIndexCoprocessor extends BaseRegionObserver {@Overridepublic void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// set configurationConfiguration conf = HBaseConfiguration.create();// need conf.set...HTable table = new HTable(conf, "t_fensi");Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);byte[] valueArray = fromCell.getValue();String from = new String(valueArray);valueArray = toCell.getValue();String to = new String(valueArray);Put putIndex = new Put((to+"-"+from).getBytes());putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());table.put(putIndex);table.close();} }2、打成jar包“fensiguanzhu.jar”上傳hdfs
hadoop fs -put fensiguanzhu.jar /demo/
3、修改t_fensi的schema,注冊協處理器
hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|' Updating all regions with the new schema... 0/1 regions updated. 1/1 regions updated. Done.4、檢查是否注冊成功
hbase(main):018:0> describe ‘ff’
DESCRIPTION ENABLED
‘ff’, {TABLE_ATTRIBUTES => {coprocessor$1 => ‘hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true
gdata.hbasecoprocessor.TestCoprocessor|1001|’}, {NAME => ‘f1’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMF
ILTER => ‘ROW’, REPLICATION_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0
‘, TTL => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, B
LOCKCACHE => ‘true’}, {NAME => ‘f2’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMFILTER => ‘ROW’, REPLICATIO
N_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’, TTL => ‘2147483647’, KE
EP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}
1 row(s) in 0.0250 seconds
5、向正向索引表中插入數據進行驗證
總結
以上是生活随笔為你收集整理的HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 成都部队文职管理岗跟技能岗哪个好考
- 下一篇: 自定义Flume拦截器,并将收集的日志存