日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce操作HBase

發(fā)布時間:2023/12/13 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce操作HBase 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

運行HBase時常會遇到個錯誤,我就有這樣的經(jīng)歷。?

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

檢查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch.?(client = 42, server = 41)

如果是這個錯誤,說明RPC協(xié)議不一致所造成的,解決方法:將hbase/lib目錄下的hadoop-core的jar文件刪除,將hadoop目錄下的hadoop-0.20.2-core.jar拷貝到hbase/lib下面,然后重新啟動hbase即可。第二種錯誤是:沒有啟動hadoop,先啟用hadoop,再啟用hbase。

在Eclipse開發(fā)中,需要加入hadoop所有的jar包以及HBase二個jar包(hbase,zooKooper)。

HBase基礎可見帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html

  • 建表,通過HBaseAdmin類中的create靜態(tài)方法來創(chuàng)建表。
  • HTable類是操作表,例如,靜態(tài)方法put可以插入數(shù)據(jù),該類初始化時可以傳遞一個行鍵,靜態(tài)方法getScanner()可以獲得某一列上的所有數(shù)據(jù),返回Result類,Result類中有個靜態(tài)方法getFamilyMap()可以獲得以列名為key,值為value,這剛好與hadoop中map結果是一樣的。
  • package test; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result;public class Htable {/*** @param args*/public static void main(String[] args) throws IOException {// TODO Auto-generated method stubConfiguration hbaseConf = HBaseConfiguration.create();HBaseAdmin admin = new HBaseAdmin(hbaseConf);HTableDescriptor htableDescriptor = new HTableDescriptor("table".getBytes()); //set the name of tablehtableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clustersadmin.createTable(htableDescriptor); //create a table HTable table = new HTable(hbaseConf, "table"); //get instance of table.for (int i = 0; i < 3; i++) { //for is number of rowsPut putRow = new Put(("row" + i).getBytes()); //the ith rowputRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1".getBytes()); //set the name of column and value.putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2".getBytes());putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3".getBytes());table.put(putRow);}for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of resultString column = new String(entry.getKey());String value = new String(entry.getValue());System.out.println(column+","+value);}}admin.disableTable("table".getBytes()); //disable the tableadmin.deleteTable("table".getBytes()); //drop the tbale } } 以上代碼不難看懂。
  • 下面介紹一下,用mapreduce怎樣操作HBase,主要對HBase中的數(shù)據(jù)進行讀取。

    現(xiàn)在有一些大的文件,需要存入HBase中,其思想是先把文件傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。

    package test;import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class MapperClass extends Mapper<LongWritable,Text,Text,Text>{public void map(LongWritable key,Text value,Context context)thorws IOException{String[] items = value.toString().split(" ");String k = items[0];String v = items[1]; context.write(new Text(k), new Text(v));}}

    Reduce類,主要是將鍵值傳到HBase表中

    package test;import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text;public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{public void reduce(Text key,Iterable<Text> values,Context context){String k = key.toString();StringBuffer str=null;for(Text value: values){str.append(value.toString());}String v = new String(str); Put putrow = new Put(k.getBytes());putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes()); } }

    由上面可知ReducerClass繼承TableReduce,在hadoop里面ReducerClass繼承Reducer類。它的原型為:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是讀出的Key類型是ImmutableBytesWritable。

    Map,Reduce,以及Job的配置分離,比較清晰,mahout也是采用這種構架。

    package test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool;public class Driver extends Configured implements Tool{@Overridepublic static void run(String[] arg0) throws Exception {// TODO Auto-generated method stubConfiguration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum.", "localhost"); Job job = new Job(conf,"Hbase");job.setJarByClass(TxtHbase.class);Path in = new Path(arg0[0]);job.setInputFormatClass(TextInputFormat.class);FileInputFormat.addInputPath(job, in);job.setMapperClass(MapperClass.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);job.waitForCompletion(true);}}

    Driver中job配置的時候沒有設置 job.setReduceClass(); 而是用?TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job);?來執(zhí)行reduce類。

    主函數(shù)

    package test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner;public class TxtHbase {public static void main(String [] args) throws Exception{

    Driver.run(new Configuration(),new THDriver(),args);

    }
    }

    ?

    讀取數(shù)據(jù)時比較簡單,編寫Mapper函數(shù),讀取<key,value>值就行了。

    package test;import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter;public class MapperClass extends MapReduceBase implementsTableMap<Text, Text> {static final String NAME = "GetDataFromHbaseTest";private Configuration conf;public void map(ImmutableBytesWritable row, Result values,OutputCollector<Text, Text> output, Reporter reporter)throws IOException {StringBuilder sb = new StringBuilder();for (Entry<byte[], byte[]> value : values.getFamilyMap("fam1".getBytes()).entrySet()) {String cell = value.getValue().toString();if (cell != null) {sb.append(new String(value.getKey())).append(new String(cell));}}output.collect(new Text(row.get()), new Text(sb.toString()));}

    要實現(xiàn)這個方法?initTableMapJob(String?table,?String?columns,?Class<? extends?TableMap>?mapper,?Class<? extends org.apache.hadoop.io.WritableComparable>?outputKeyClass,?Class<? extends org.apache.hadoop.io.Writable>?outputValueClass, org.apache.hadoop.mapred.JobConf?job, boolean?addDependencyJars)。

    package test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool;public class Driver extends Configured implements Tool{@Overridepublic static void run(String[] arg0) throws Exception {// TODO Auto-generated method stubConfiguration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum.", "localhost"); Job job = new Job(conf,"Hbase");job.setJarByClass(TxtHbase.class);job.setInputFormatClass(TextInputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);
    TableMapReduceUtilinitTableMapperJob(
    "table", args0[0],MapperClass.class, job);
    job.waitForCompletion(true); }
    }

    主函數(shù)

    package test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner;public class TxtHbase {public static void main(String [] args) throws Exception{Driver.run(new Configuration(),new THDriver(),args); } }

    ?

    ?作者:BIGBIGBOAT/Liqizhou

    轉載于:https://www.cnblogs.com/liqizhou/archive/2012/05/17/2504279.html

    總結

    以上是生活随笔為你收集整理的MapReduce操作HBase的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。