大数据之路Week10_day04 (Hbase的二级索引,二级索引的本质就是建立各列值与行键之间的映射关系)
二級索引的本質就是建立各列值與行鍵之間的映射關系
HBASE是在hadoop之上構建非關系型,面向列存儲的開源分布式結構化數據存儲系統。
Hbase的局限性:
HBase本身只提供基于行鍵和全表掃描的查詢,而行鍵索引單一,對于多維度的查詢困難。
所以我們引進一個二級索引的概念
常見的二級索引:
HBase的一級索引就是rowkey,我們只能通過rowkey進行檢索。如果我們相對hbase里面列族的列列進行一些組合查詢,就需要采用HBase的二級索引方案來進行多條件的查詢。
1. MapReduce方案
2. ITHBASE(Indexed-Transanctional HBase)方案
3. IHBASE(Index HBase)方案
4. Hbase Coprocessor(協處理器)方案
5. Solr+hbase方案
6. CCIndex(complementalclustering index)方案
二級索引的種類
1、創建單列索引
2、同時創建多個單列索引
3、創建聯合索引(最多同時支持3個列)
4、只根據rowkey創建索引
單表建立二級索引
1.首先disable ‘表名’
2.然后修改表 alter 'LogTable',METHOD=>'table_att','coprocessor'=>'hdfs:///寫好的Hbase協處理器(coprocessor)的jar包名|類的絕對路徑名|1001' 3. enable '表名'
二級索引的設計思路
二級索引的本質就是建立各列值與行鍵之間的映射關系
如上圖1,當要對F:C1這列建立索引時,只需要建立F:C1各列值到其對應行鍵的映射關系,如C11->RK1等,這樣就完成了對F:C1列值的二級索引的構建,當要查詢符合F:C1=C11對應的F:C2的列值時(即根據C1=C11來查詢C2的值,圖1青色部分)
其查詢步驟如下:
1. 根據C1=C11到索引數據中查找其對應的RK,查詢得到其對應的RK=RK1
2. 得到RK1后就自然能根據RK1來查詢C2的值了 這是構建二級索引大概思路,其他組合查詢的聯合索引的建立也類似。
Mapreduce的方式創建二級索引
使用整合MapReduce的方式創建hbase索引。主要的流程如下:
1.1掃描輸入表,使用hbase繼承類TableMapper
1.2獲取rowkey和指定字段名稱和字段值
1.3創建Put實例, value=” “, rowkey=班級,column=學號
1.4使用IdentityTableReducer將數據寫入索引表
實例:
1、在hbase中創建索引表 student_index
create 'student_index','info'
2、編寫mapreduce代碼
package com.wyh.Hbase_MR; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; import java.io.IOException; /**
* 建立索引表
*
*/ public class HbaseToIndex { /**
* Map段 將讀取到的數據,設置班級+學號當作key
*/
public static class IndexMap extends TableMapper<Text,NullWritable>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String id = Bytes.toString(key.get());
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes())); String key1 = id+"_"+clazz;
context.write(new Text(key1),NullWritable.get()); }
} /**
* Reduce段 獲取Map傳過來的key
*/
public static class IndexReduce extends TableReducer<Text,NullWritable,NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split("_");
String id = split[0];
String clazz = split[1]; Put put = new Put(clazz.getBytes());
put.add("info".getBytes(),id.getBytes(),"".getBytes()); context.write(NullWritable.get(),put);
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181"); Job job = Job.getInstance(conf);
job.setJobName("HbaseToIndex");
job.setJarByClass(HbaseToIndex.class); Scan scan = new Scan();
scan.addFamily("info".getBytes()); TableMapReduceUtil.initTableMapperJob("students",scan,IndexMap.class,Text.class,NullWritable.class,job);
TableMapReduceUtil.initTableReducerJob("student_index",IndexReduce.class,job); job.waitForCompletion(true); }
}
3、打成jar包上傳到hadoop中運行
hadoop jar hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.wyh.Hbase_MR.HbaseToIndex
4、編寫查詢代碼,測試結果
package com.wyh.Hbase_MR; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; public class OpIndex {
private Configuration conf;
private HConnection connection;
private HBaseAdmin admin; /**
* 連接到Hbase
*/
@Before
public void Cline(){ try { conf = new Configuration();
conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
connection = HConnectionManager.createConnection(conf);
admin = new HBaseAdmin(conf);
System.out.println("建立連接成功。。。"+connection); } catch (IOException e) {
e.printStackTrace();
}
} /**
* 通過索引表進行查詢數據
*/
@Test
public void scanData(){
try {
//創建一個集合存放查詢到的學號
ArrayList<Get> gets = new ArrayList<>(); //獲取到索引表
HTableInterface student_index = connection.getTable("student_index");
Get get = new Get("理科二班".getBytes());
Result result = student_index.get(get);
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String id = Bytes.toString(CellUtil.cloneQualifier(cell)); gets.add(new Get(id.getBytes()));
} //獲取到學生表
HTableInterface students = connection.getTable("students"); Result[] results = students.get(gets); for (Result result1 : results) {
String id = Bytes.toString(result1.getRow());
String name = Bytes.toString(result1.getValue("info".getBytes(), "name".getBytes()));
String age = Bytes.toString(result1.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(result1.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(result1.getValue("info".getBytes(), "clazz".getBytes())); System.out.println(id+"\t"+name+"\t"+age+"\t"+gender+"\t"+clazz);
} } catch (IOException e) {
e.printStackTrace();
} } @After
public void Close(){
if(admin!=null){
try {
admin.close();
System.out.println("admin已經關閉。。。。");
} catch (IOException e) {
e.printStackTrace();
}
} if(connection!=null){
try {
connection.close();
System.out.println("connection已經關閉。。。。");
} catch (IOException e) {
e.printStackTrace();
}
} }
}
運行結果:
總結
以上是生活随笔為你收集整理的大数据之路Week10_day04 (Hbase的二级索引,二级索引的本质就是建立各列值与行键之间的映射关系)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 周进度总结
- 下一篇: Java Properties配置文件和