日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

第1关:MapReduce综合应用案例 — 电信数据清洗

發(fā)布時間:2023/12/18 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 第1关:MapReduce综合应用案例 — 电信数据清洗 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

根據提示,在右側編輯器補充代碼,對數據按照一定規(guī)則進行清洗。

數據說明如下: a.txt

數據切分方式:,

數據所在位置:/user/test/input/a.txt

15733218050,15778423030,1542457633,1542457678,450000,530000

157332180501577842303015424576331542457678450000530000
呼叫者手機號接受者手機號開始時間戳(s)接受時間戳(s)呼叫者地址省份編碼接受者地址省份編碼

Mysql數據庫:

用戶名:root 密碼:123123

數據庫名:mydb

用戶表:userphone

列名類型非空是否自增介紹
idint(11)用戶ID
phonevarchar(255)手機號
trueNamevarchar(255)真實姓名

地址省份表:allregion

列名類型非空是否自增介紹
idint(11)用戶ID
CodeNumvarchar(255)編號
Addressvarchar(255)地址

清洗規(guī)則:

  • 處理數據中的時間戳(秒級)將其轉化為2017-06-21 07:01:58,年-月-日 時:分:秒 這種格式;

  • 處理數據中的省份編碼,結合mysql的表數據對應,將其轉換成省份名稱;

  • 處理用戶手機號,與mysql的表數據對應,關聯用戶的真實姓名;

  • 處理數據中的開始時間與結束時間并計算通信時長(以秒為單位);

  • 設置數據來源文件路徑及清洗后的數據存儲路徑: 數據來源路徑為: /user/test/input/a.txt (HDFS); 清洗后的數據存放于:/user/test/output (HDFS)。

數據清洗后如下:

鄧二,張倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龍江省,上海市

鄧二張倩13666666666151518896012018-03-29 10:58:122018-03-29 10:58:4230黑龍江省上海市
用戶名A用戶名B用戶A的手機號用戶B的手機號開始時間結束時間

step/com/LogMR.java

package com; import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogMR {/********** begin **********/static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {Map<String, String> userMap = new HashMap<>();Map<String, String> addressMap = new HashMap<>();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");PhoneLog pl = new PhoneLog();Text text = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {Connection connection = DBHelper.getConnection();try {Statement statement = connection.createStatement();String sql = "select * from userphone";ResultSet resultSet = statement.executeQuery(sql);while (resultSet.next()) {String phone = resultSet.getString(2);String trueName = resultSet.getString(3);userMap.put(phone, trueName);}String sql2 = "select * from allregion";ResultSet resultSetA = statement.executeQuery(sql2);while (resultSetA.next()) {String phone = resultSetA.getString(2);String trueName = resultSetA.getString(3);addressMap.put(phone, trueName);}} catch (SQLException e) {e.printStackTrace();}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String str = value.toString();String[] split = str.split(",");if (split.length == 6) {String trueName1 = userMap.get(split[0]);String trueName2 = userMap.get(split[1]);String address1 = addressMap.get(split[4]);String address2 = addressMap.get(split[5]);long startTimestamp = Long.parseLong(split[2]);String startTime = sdf.format(startTimestamp * 1000);long endTimestamp = Long.parseLong(split[3]);String endTime = sdf.format(endTimestamp * 1000);long timeLen = endTimestamp - startTimestamp;pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,address2);context.write(pl, NullWritable.get());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogMR.class);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(PhoneLog.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(0);Path inPath = new Path("/user/test/input/a.txt");Path out = new Path("/user/test/output");FileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, out);job.waitForCompletion(true);}/********** end **********/ }

step/com/DBHelper.java

package com; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class DBHelper {/********** begin **********/private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";private static final String username = "root";// 數據庫的用戶名private static final String password = "123123";// 數據庫的密碼:這個是自己安裝數據庫的時候設置的,每個人不同。private static Connection conn = null; // 聲明數據庫連接對象static {try {Class.forName(driver);} catch (Exception ex) {ex.printStackTrace();}}public static Connection getConnection() {if (conn == null) {try {conn = DriverManager.getConnection(url, username, password);} catch (SQLException e) {e.printStackTrace();} // 連接數據庫return conn;}return conn;}/********** end **********/ }

step/com/phonelog.java

package com; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class PhoneLog implements WritableComparable<PhoneLog> {private String userA;private String userB;private String userA_Phone;private String userB_Phone;private String startTime;private String endTime;private Long timeLen;private String userA_Address;private String userB_Address;public PhoneLog() {}public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,String endTime, Long timeLen, String userA_Address, String userB_Address) {this.userA = userA;this.userB = userB;this.userA_Phone = userA_Phone;this.userB_Phone = userB_Phone;this.startTime = startTime;this.endTime = endTime;this.timeLen = timeLen;this.userA_Address = userA_Address;this.userB_Address = userB_Address;}public String getUserA_Phone() {return userA_Phone;}public void setUserA_Phone(String userA_Phone) {this.userA_Phone = userA_Phone;}public String getUserB_Phone() {return userB_Phone;}public void setUserB_Phone(String userB_Phone) {this.userB_Phone = userB_Phone;}public String getUserA() {return userA;}public void setUserA(String userA) {this.userA = userA;}public String getUserB() {return userB;}public void setUserB(String userB) {this.userB = userB;}public String getStartTime() {return startTime;}public void setStartTime(String startTime) {this.startTime = startTime;}public String getEndTime() {return endTime;}public void setEndTime(String endTime) {this.endTime = endTime;}public Long getTimeLen() {return timeLen;}public void setTimeLen(Long timeLen) {this.timeLen = timeLen;}public String getUserA_Address() {return userA_Address;}public void setUserA_Address(String userA_Address) {this.userA_Address = userA_Address;}public String getUserB_Address() {return userB_Address;}public void setUserB_Address(String userB_Address) {this.userB_Address = userB_Address;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(userA);out.writeUTF(userB);out.writeUTF(userA_Phone);out.writeUTF(userB_Phone);out.writeUTF(startTime);out.writeUTF(endTime);out.writeLong(timeLen);out.writeUTF(userA_Address);out.writeUTF(userB_Address);}@Overridepublic void readFields(DataInput in) throws IOException {userA = in.readUTF();userB = in.readUTF();userA_Phone = in.readUTF();userB_Phone = in.readUTF();startTime = in.readUTF();endTime = in.readUTF();timeLen = in.readLong();userA_Address = in.readUTF();userB_Address = in.readUTF();}@Overridepublic String toString() {return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","+ timeLen + "," + userA_Address + "," + userB_Address;}@Overridepublic int compareTo(PhoneLog pl) {if(this.hashCode() == pl.hashCode()) {return 0;}return -1;} }

最后重啟hadoop#start-all.sh? 完成評測

總結

以上是生活随笔為你收集整理的第1关:MapReduce综合应用案例 — 电信数据清洗的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。