大数据离线流程(小练习)
通過觀察原始數(shù)據(jù)形式,可以發(fā)現(xiàn),數(shù)據(jù)中列與列的分隔符是“!”。視頻可以有多個(gè)所屬分類,每個(gè)所屬分類用&符號(hào)分割,且分割的兩邊有空格字符,同時(shí)相關(guān)視頻也是可以有多個(gè),多個(gè)相關(guān)視頻又用“!”進(jìn)行分割。為了分析數(shù)據(jù)時(shí)方便對(duì)存在多個(gè)子元素的數(shù)據(jù)進(jìn)行操作,
我們首先進(jìn)行數(shù)據(jù)重組清洗操作。
即:將每條數(shù)據(jù)的“視頻類別”用“&”分割,同時(shí)去掉兩邊空格,多個(gè)“相關(guān)視頻id”也使用“&”進(jìn)行分割
實(shí)現(xiàn)效果【截圖】:
實(shí)現(xiàn)代碼【截圖】
Map代碼
這里Reduce 可以省略 不寫(所以沒有必要畫蛇添足)
驅(qū)動(dòng)代碼
代碼 :
package com.czxy.MR;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/*** Created by 一個(gè)蔡狗 on 2020/1/7.*/ public class VideoRunner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = new Job(conf, "VideoR");job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("E:\\input\\video\\"));job.setMapperClass(VideoMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Text.class);// job.setReducerClass(VideoReduce.class);// job.setOutputKeyClass(Text.class);// job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("E:\\output\\video"));System.exit(job.waitForCompletion(true) ? 0 : 1);}static class VideoMapper extends Mapper<LongWritable, Text, NullWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("!");String h = "";String end = "";for (int i = 0; i < split.length; i++) {if (i < 9) {h += split[i] + "!";if (h.contains("&")) {h = h.replace(" & ", "&");}} else {end += split[i];if (i != split.length - 1) {end += "&";}}}//健壯性判斷if (end.equals("")){end=null;}String t = h+end;System.out.println(t);context.write(NullWritable.get(),new Text(t));}}// 可以省略不寫 // static class VideoReduce extends Reducer<NullWritable, Text, Text, NullWritable> {// @Override // protected void reduce(NullWritable key, Iterable<Text> values, //Context context) throws IOException, InterruptedException {// for (Text value : values) {// context.write(value,NullWritable.get()); // }// } // }}把預(yù)處理之后的數(shù)據(jù)進(jìn)行入庫到hive中
數(shù)據(jù)的入庫操作階段?
創(chuàng)建數(shù)據(jù)庫和表 :
創(chuàng)建數(shù)據(jù)庫名字為:video create database video;創(chuàng)建原始數(shù)據(jù)表:視頻表:douyinvideo_ori 用戶表:douyinvideo_user_ori創(chuàng)建ORC格式的表:視頻表:douyinvideo_orc 用戶表:douyinvideo_user_orc 給出創(chuàng)建原始表語句 創(chuàng)建douyinvideo_ori視頻表: create table douyinvideo_ori(videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int,relatedId array<string>) row format delimited fields terminated by "!" collection items terminated by "&" stored as textfile; 創(chuàng)建douyinvideo_user_ori用戶表: create table douyinvideo_user_ori(uploader string,videos int,friends int) row format delimited fields terminated by "," stored as textfile;?
數(shù)據(jù)入庫效果【截圖】 :
數(shù)據(jù)入庫命令【命令】 :
2.1
| -- 創(chuàng)建?douyinvideo_orc 表 ? ? ? ? ? ? ? ? |
?
2.2
| -- 請(qǐng)寫出導(dǎo)入語句,將相應(yīng)語句寫入答題卡中:?????douyinvideo_ori: ? ? ? |
?
2.3
| -- 2.3從原始表查詢數(shù)據(jù)并插入對(duì)應(yīng)的ORC表中 ? |
?
3.1
| -- #! ?bin/bash hive -e " select douyinvideo_ori.*
? ? |
3.2
| ?-- 3.2統(tǒng)計(jì)上傳視頻最多的用戶前十名以及他們上傳的視頻流量在前20的視頻,把查詢結(jié)果保存到???/export/uploader.txt -- ?腳本 ? #! ?bin/bash ? ? |
建表語句
創(chuàng)建ratings外部表的語句:
| -- 4.1創(chuàng)建hive對(duì)應(yīng)的數(shù)據(jù)庫外部表 ? |
?
創(chuàng)建uploader外部表的語句:
| ?-- 請(qǐng)寫出創(chuàng)建?uploader 外部表的語句,將相應(yīng)語句寫入答題卡中: ? ? ? |
4.2
數(shù)據(jù)加載語句
| -- 4.2加載第3步的結(jié)果數(shù)據(jù)到外部表中 ? -- 請(qǐng)寫出加載語句到??uploader 表中,將相應(yīng)語句寫入答題卡中 ? |
4.3
創(chuàng)建hive ?hbase映射表
| -- 創(chuàng)建hbase_ratings表并進(jìn)行映射,請(qǐng)將相應(yīng)語句寫入答題卡中: ? ? ? ? ? ? ?
? ? |
4
插入數(shù)據(jù)
| ?-- 請(qǐng)寫出通過insert overwrite select,插入hbase_ratings表的語句,將相應(yīng)語句寫入答題卡中 ? |
?
| -- 請(qǐng)寫出通過insert overwrite select,插入hbase_uploader表的語句,將相應(yīng)語句寫入答題卡中 ?insert overwrite table hbase_uploader select *from uploader; ? |
?
數(shù)據(jù)的查詢顯示階段
1 代碼【截圖】:
請(qǐng)使用hbaseapi 對(duì)hbase_ratings表按照rowkey=1查詢cf列族下面的videoId,ratings列的值
代碼 :?
package com.czxy.Api;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes;/*** Created by 一個(gè)蔡狗 on 2020/1/7.*/ public class HbaseAPI01 {public static void main(String[] args) throws Exception { // 1 : 請(qǐng)使用hbaseapi 對(duì)hbase_ratings表按照rowkey=1查詢cf列族下面的videoId,ratings列的值Configuration conf = new Configuration();conf.set("hbase.zookeeper.quorum","node001,node002,node003");Connection connection = ConnectionFactory.createConnection(conf);Table ratings = connection.getTable(TableName.valueOf("hbase_ratings"));Scan scan = new Scan();FilterList filterList = new FilterList();RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("LVCb52iQrfo")));QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("ratings")));filterList.addFilter(rowFilter);filterList.addFilter(qf2);scan.setFilter(filterList);ResultScanner scanner = ratings.getScanner(scan);for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.println(Bytes.toString(CellUtil.cloneRow(cell))+""+Bytes.toString(CellUtil.cloneQualifier(cell))+""+Bytes.toInt(CellUtil.cloneValue(cell)));}}ratings.close();connection.close();}}2 代碼【截圖】:
請(qǐng)使用hbaseapi 對(duì)hbase_uploader表通過RowFilter過濾比rowKey =MdNyOfjnETI小的所有值出來
代碼 :
package com.czxy.Api;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes;/*** Created by 一個(gè)蔡狗 on 2020/1/7.*/ public class HbaseAPI02 {// 2 : 請(qǐng)使用hbaseapi 對(duì)hbase_uploader表通過RowFilter過濾比rowKey =MdNyOfjnETI小的所有值出來public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set("hbase.zookeeper.quorum","node001,node002,node003");Connection connection = ConnectionFactory.createConnection(conf);Table ratings = connection.getTable(TableName.valueOf("hbase_uploader"));Scan scan = new Scan();RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS,new BinaryComparator(Bytes.toBytes("MdNyOfjnETI")));scan.setFilter(rowFilter);ResultScanner scanner = ratings.getScanner(scan);for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.println(Bytes.toString(CellUtil.cloneRow(cell))+"_"+Bytes.toString(CellUtil.cloneQualifier(cell))+"_"+Bytes.toString(CellUtil.cloneValue(cell)));}}ratings.close();connection.close();}}總結(jié)
以上是生活随笔為你收集整理的大数据离线流程(小练习)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android软件架构
- 下一篇: 程序员成长系列--应该读的通用技术书籍列