日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

java mapreduce 读hbase数据 写入hdfs 含maven依赖

發布時間:2023/12/31 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java mapreduce 读hbase数据 写入hdfs 含maven依赖 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

mapreduce 讀hbase數據 寫入hdfs
java代碼如下

import com.google.common.collect.Lists; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger;public class HbaseToHdfs {private static final Logger logger = Logger.getLogger(HbaseToHdfs03.class);public static Configuration conf = HBaseConfiguration.create();static {//hbase的ip和端口conf.set("hbase.master", "xxx:xxx");conf.set("mapreduce.output.fileoutputformat.compress", "false");//不進行壓縮//hadoop的配置文件位置conf.addResource(new Path("xxx/core-site.xml"));conf.addResource(new Path("xxx/hdfs-site.xml"));//hbase的配置文件位置conf.addResource(new Path("xxx/hbase-site.xml"));conf.set("hbase.client.pause","2000");conf.set("hbase.client.retries.number","100");conf.set("hbase.client.operation.timeout","500000");}public static void main(String[] args)throws Exception{InputStream foin = new FileInputStream(args[2]);Properties prop = new Properties();prop.load(foin);foin.close();String cloumns = prop.getProperty(args[0]).trim();conf.set("cloumns", cloumns);//xxx為自己起的任務名Job job = Job.getInstance(conf, "xxx");job.setJarByClass(HbaseToHdfs.class);job.setMapperClass(HbaseToHdfs.MyMapper.class);job.setNumReduceTasks(0);TableMapReduceUtil.initTableMapperJob(initScans(job, args[0]), HbaseToHdfs03.MyMapper.class,NullWritable.class, Text.class, job);FileSystem fs = FileSystem.get(conf);if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]));}FileOutputFormat.setOutputPath(job, new Path(args[1]));long start = System.currentTimeMillis();try{job.waitForCompletion(true);}finally{fs.setPermission(new Path(args[1]), new FsPermission("777"));FileStatus[] files = fs.listStatus(new Path(args[1]));for (FileStatus fileStatus : files){Path p = fileStatus.getPath();fs.setPermission(p, new FsPermission("777"));}fs.close();long end = System.currentTimeMillis();logger.info("Job<" + job.getJobName() + ">是否執行成功:" + job.isSuccessful() + ";開始時間:" + start + "; 結束時間:" + end + "; 用時:" + (end - start) + "ms");}}private static List<Scan> initScans(Job job, String tableName){Configuration conf = job.getConfiguration();Scan scan = new Scan();scan.setAttribute("scan.attributes.table.name", Bytes.toBytes(tableName));return Lists.newArrayList(new Scan[] { scan });}public static class MyMapperextends TableMapper<NullWritable, Text>{String cloumns = "";protected void map(ImmutableBytesWritable key, Result r, Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{if (r != null){String all = "";int j = 0;for (String cloumn : this.cloumns.split(",")){j++;String s = "";try{//xxx為列簇名byte[] p = r.getValue("xxx".getBytes(), cloumn.getBytes());if (p != null){//設置編碼集及去除一下跟分隔符沖突的內容,這里可以自定義s = new String(p, "UTF-8");s = s.replaceAll("\\n", "").replaceAll("\\r", "");s = s.replaceAll(",", ".");s = s.replaceAll(";", ".");if ("NULL".equals(s)) {s = "";}}}catch (Exception e){System.out.println("111");s = "";}if (j == 1) {all = s;} else {//這里設置到hdfs的分隔符為逗號all = all + "," + s;}}context.write(NullWritable.get(), new Text(all));}}protected void setup(Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{Configuration conf = context.getConfiguration();this.cloumns = conf.get("cloumns");}} }

使用方法如下:
傳入三個參數
第一個為hbase的表名
第二個為hdfs的寫入路徑
第三個為一個配置文件
里面寫的格式為:
hbase表名=準備寫到hive的字段列表,以逗號分開
maven依賴如下:

<dependencies><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>1.0.0-cdh5.5.0</version><type>pom</type></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>1.0.0-cdh5.5.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.0.0-cdh5.5.0</version></dependency></dependencies>

總結

以上是生活随笔為你收集整理的java mapreduce 读hbase数据 写入hdfs 含maven依赖的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。