日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop生成HFile直接入库HBase心得

發(fā)布時(shí)間:2025/7/14 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop生成HFile直接入库HBase心得 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
轉(zhuǎn)載請標(biāo)明出處:http://blackwing.iteye.com/blog/1991380?


hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個(gè)字段的文本格式)生成HFile,并且使用另外一個(gè)類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動(dòng)到hbase對應(yīng)的hdfs目錄。?

PS:網(wǎng)上看到一個(gè)XD說,直接生成HFile并入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動(dòng)文件到hbase目錄高,這點(diǎn)沒有驗(yàn)證,我的做法也是先生成,再move。?

官方教材在此:?
Java代碼??
  • http://hbase.apache.org/book/ops_mgt.html#importtsv??

  • 但I(xiàn)mportTsv功能對我來說不適合,例如文件格式為:?
    Java代碼??
  • topsid???uid???roler_num???typ????????time??
  • 10??????111111???255?????????0???????1386553377000??

  • ImportTsv導(dǎo)入的命令為:?
    Java代碼??
  • bin/hbase?org.apache.hadoop.hbase.mapreduce.ImportTsv?-Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type?-Dimporttsv.bulk.output=hdfs://storefile-outputdir?<hdfs-data-inputdir>??


  • 它生成的表格式為:?
    Java代碼??
  • row?: 10???
  • cf??:??kq??
  • qualifier:?topsid??
  • value:?10??
  • .....??

  • 而我要求的格式是:?
    Java代碼??
  • row?: 10-111111-255??
  • cf??:??kq??
  • qualifier:?0??
  • value:?1??


  • 所以還是自己寫MR處理數(shù)據(jù)方便。?
    Mapper:?
    Java代碼??
  • /*?
  • ?*?adminOnOff.log?文件格式:?
  • ?*?topsid???uid???roler_num???typ???time?
  • ?*?*/??
  • public?class?HFileImportMapper2?extends??
  • ????????Mapper<LongWritable,?Text,?ImmutableBytesWritable,?KeyValue>?{??
  • ????protected?SimpleDateFormat?sdf?=?new?SimpleDateFormat("yyyyMMdd");??
  • ????protected?final?String?CF_KQ="kq";//考勤??
  • ????protected?final?int?ONE=1;??
  • ????@Override??
  • ????protected?void?map(LongWritable?key,?Text?value,Context?context)??
  • ????????????throws?IOException,?InterruptedException?{??
  • ????????String?line?=?value.toString();??
  • ????????System.out.println("line?:?"+line);??
  • ????????String[]?datas?=?line.split("\\s+");??
  • ????????//?row格式為:yyyyMMdd-sid-uid-role_num-timestamp-typ??
  • ????????String?row?=?sdf.format(new?Date(Long.parseLong(datas[4])))??
  • ????????????????+?"-"?+?datas[0]?+?"-"?+?datas[1]?+?"-"?+?datas[2]??
  • ????????????????+?"-"?+?datas[4]?+?"-"?+?datas[3];??
  • ????????ImmutableBytesWritable?rowkey?=?new?ImmutableBytesWritable(??
  • ????????????????Bytes.toBytes(row));??
  • ????????KeyValue?kv?=?new?KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(),?datas[3].getBytes(),Bytes.toBytes(this.ONE));??
  • ????????context.write(rowkey,?kv);??
  • ????????}??
  • }??


  • job:?
    Java代碼??
  • public?class?GenHFile2?{??
  • ????public?static?void?main(String[]?args)?{??
  • ????????Configuration?conf?=?new?Configuration();??
  • ????????conf.addResource("myConf.xml");??
  • ????????String?input?=?conf.get("input");??
  • ????????String?output?=?conf.get("output");??
  • ????????String?tableName?=?conf.get("source_table");??
  • ????????System.out.println("table?:?"+tableName);??
  • ????????HTable?table;??
  • ????????try?{??
  • ????????????//運(yùn)行前,刪除已存在的中間輸出目錄??
  • ????????????try?{??
  • ????????????????FileSystem?fs?=?FileSystem.get(URI.create(output),?conf);??
  • ????????????????fs.delete(new?Path(output),true);??
  • ????????????????fs.close();??
  • ????????????}?catch?(IOException?e1)?{??
  • ????????????????e1.printStackTrace();??
  • ????????????}??
  • ??????????????
  • ????????????table?=?new?HTable(conf,tableName.getBytes());??
  • ????????????Job?job?=?new?Job(conf);??
  • ????????????job.setJobName("Generate?HFile");??
  • ??????????????
  • ????????????job.setJarByClass(HFileImportMapper2.class);??
  • ????????????job.setInputFormatClass(TextInputFormat.class);??
  • ????????????job.setMapperClass(HFileImportMapper2.class);??
  • ????????????FileInputFormat.setInputPaths(job,?input);??
  • ??????????????
  • //job.setReducerClass(KeyValueSortReducer.class);??
  • //job.setMapOutputKeyClass(ImmutableBytesWritable.class);??
  • //job.setMapOutputValueClass(KeyValue.class);??
  • ????????????job.getConfiguration().set("mapred.mapoutput.key.class",?"org.apache.hadoop.hbase.io.ImmutableBytesWritable");??
  • ????????????job.getConfiguration().set("mapred.mapoutput.value.class",?"org.apache.hadoop.hbase.KeyValue");??
  • ??????????????
  • //job.setOutputFormatClass(HFileOutputFormat.class);??
  • FileOutputFormat.setOutputPath(job,?new?Path(output));??
  • ????//job.setPartitionerClass(SimpleTotalOrderPartitioner.class);??
  • HFileOutputFormat.configureIncrementalLoad(job,table);??
  • ????????????try?{??
  • ????????????????job.waitForCompletion(true);??
  • ????????????}?catch?(InterruptedException?e)?{??
  • ????????????????e.printStackTrace();??
  • ????????????}?catch?(ClassNotFoundException?e)?{??
  • ????????????????e.printStackTrace();??
  • ????????????}??
  • ????????}?catch?(IOException?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}??
  • ????}??
  • }??


  • 生成的HFile文件在hdfs的/output目錄下,已經(jīng)根據(jù)cf名稱建好文件目錄:?
    Java代碼??
  • hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560??

  • 其中:?
    Java代碼??
  • HFileOutputFormat.configureIncrementalLoad(job,table);??

  • 根據(jù)其源碼知道,會(huì)自動(dòng)為job設(shè)置好以下參數(shù):?
    Java代碼??
  • public?static?void?configureIncrementalLoad(Job?job,?HTable?table)??
  • throws?IOException?{??
  • ??Configuration?conf?=?job.getConfiguration();??
  • ??
  • ??job.setOutputKeyClass(ImmutableBytesWritable.class);??
  • ??job.setOutputValueClass(KeyValue.class);??
  • ??job.setOutputFormatClass(HFileOutputFormat.class);??
  • ??
  • ??//?Based?on?the?configured?map?output?class,?set?the?correct?reducer?to?properly??
  • ??//?sort?the?incoming?values.??
  • ??//?TODO?it?would?be?nice?to?pick?one?or?the?other?of?these?formats.??
  • ??if?(KeyValue.class.equals(job.getMapOutputValueClass()))?{??
  • ????job.setReducerClass(KeyValueSortReducer.class);??
  • ??}?else?if?(Put.class.equals(job.getMapOutputValueClass()))?{??
  • ????job.setReducerClass(PutSortReducer.class);??
  • ??}?else?if?(Text.class.equals(job.getMapOutputValueClass()))?{??
  • ????job.setReducerClass(TextSortReducer.class);??
  • ??}?else?{??
  • ????LOG.warn("Unknown?map?output?value?type:"?+?job.getMapOutputValueClass());??
  • ??}??
  • ??
  • ??conf.setStrings("io.serializations",?conf.get("io.serializations"),??
  • ??????MutationSerialization.class.getName(),?ResultSerialization.class.getName(),??
  • ??????KeyValueSerialization.class.getName());??
  • ??
  • ??//?Use?table's?region?boundaries?for?TOP?split?points.??
  • ??LOG.info("Looking?up?current?regions?for?table?"?+?Bytes.toString(table.getTableName()));??
  • ??List<ImmutableBytesWritable>?startKeys?=?getRegionStartKeys(table);??
  • ??LOG.info("Configuring?"?+?startKeys.size()?+?"?reduce?partitions?"?+??
  • ??????"to?match?current?region?count");??
  • ??job.setNumReduceTasks(startKeys.size());??
  • ??
  • ??configurePartitioner(job,?startKeys);??
  • ??//?Set?compression?algorithms?based?on?column?families??
  • ??configureCompression(table,?conf);??
  • ??configureBloomType(table,?conf);??
  • ??configureBlockSize(table,?conf);??
  • ??
  • ??TableMapReduceUtil.addDependencyJars(job);??
  • ??TableMapReduceUtil.initCredentials(job);??
  • ??LOG.info("Incremental?table?"?+?Bytes.toString(table.getTableName())?+?"?output?configured.");??
  • }??


  • HFileOutputFormat只支持寫單個(gè)column family,如果有多個(gè)cf,則需要寫多個(gè)job來實(shí)現(xiàn)了。?

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop生成HFile直接入库HBase心得的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。