日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java map 实现 序列化,MapReduce序列化

發布時間:2023/12/10 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java map 实现 序列化,MapReduce序列化 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

序列化就是把內存中的對象轉換成字節序列以便于存儲到磁盤(持久化)和網絡傳輸。

反序列化就是將字節序列或者是持久化的數據轉換成內存中的對象。

內存中的對象只能本地進程使用,斷掉后就消失了,也不能被發送到網絡上的另一臺機器,序列化可以將內存中的對象發送到遠程機器。由于Java本身的序列化框架(Serializable)太重,序列化的對象包含了很多額外信息,不便于在網絡中高效傳輸,Hadoop開發了自己的序列化機制(Writable)。

實現自定義bean對象的序列化

步驟如下:

必須實現Writable接口;

反序列化時,需要反射調用空構造參數,所以必須有空參構造;

public FlowBean() {

super();

}

重寫序列化方法;

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

重寫反序列化方法;

@Override

public void readFields(DataInput in) throws IOException {

upFlow = in.readLong();

downFlow = in.readLong();

sumFlow = in.readLong();

}

注意:反序列化的順序和序列化的順序完全一致。

要想把結果顯示在文件中,需要重寫toString()方法,可用“\t“分開;

如果需要將自定義的Bean放在Key中傳輸,還需要實現Comparable接口,因為MapReduce框架中的Shuffle過程要求必須對key必須能排序。

@Override

public int compareTo(FlowBean o) {

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

自定義序列化

統計txt中每個電話號的上行流量、下行流量和總流量。數據示例如下,倒數第二和第三列分別為下行流量和上行流量。

0 13152567890 www.baidu.com 90 100 200

1 16592992187 www.google.com 100 2000 200

2 15716605853 www.vx.com 2000 2043 200

3 16592992187 www.baidu.com 204 222 200

4 13152567890 www.python.org 20 40 500

自定義的Bean,按照上述要求完成。

package Flowsum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements Writable {

private long upFlow;

private long downFlow;

private long sumFlow;

// 空參構造,實現反射調用

public FlowBean() {

super();

}

// 有參構造

public FlowBean(long upFlow, long downFlow) {

super();

this.upFlow = upFlow;

this.downFlow = downFlow;

sumFlow = upFlow + downFlow;

}

// 序列化方法

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeLong(upFlow);

dataOutput.writeLong(downFlow);

dataOutput.writeLong(sumFlow);

}

// 反序列化方法

public void readFields(DataInput dataInput) throws IOException {

// 要求和序列化時的順序一致

upFlow = dataInput.readLong();

downFlow = dataInput.readLong();

sumFlow = dataInput.readLong();

}

@Override

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

}

public long getUpFlow() {

return upFlow;

}

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

}

public long getDownFlow() {

return downFlow;

}

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

}

public long getSumFlow() {

return sumFlow;

}

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

}

public void set(long upFlow2, long downFlow2) {

upFlow = upFlow2;

downFlow = downFlow2;

sumFlow = upFlow + downFlow;

}

}

注意:

1)空參構造必須有;

2)序列化的過程和反序列化的過程比必須一致;

3)每個字段必須有get和set方法。

Mapper

package Flowsum;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper {

Text k = new Text();

FlowBean v = new FlowBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 1 獲取一行

String line = value.toString();

// 2 切分

String[] fields = line.split("\t");

// 3 封裝對象

k.set(fields[1]);

long upFlow = Long.parseLong(fields[fields.length - 3]);

long downFlow = Long.parseLong(fields[fields.length - 2]);

v.setUpFlow(upFlow);

v.setDownFlow(downFlow);

// 4 寫出

context.write(k, v);

}

}

Reducer

package Flowsum;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountReducer extends Reducer {

FlowBean v = new FlowBean();

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

// 1 累加求和

long sum_upFlow = 0;

long sum_downFlow = 0;

for (FlowBean flowBean : values) {

sum_upFlow += flowBean.getUpFlow();

sum_downFlow += flowBean.getDownFlow();

}

v.set(sum_upFlow, sum_downFlow);

// 2 寫出

context.write(key, v);

}

}

Driver

package Flowsum;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1 獲取Job對象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 設置jar路徑

job.setJarByClass(FlowCountDriver.class);

// 3 關聯Mapper和Reducer

job.setMapperClass(FlowCountMapper.class);

job.setReducerClass(FlowCountReducer.class);

// 4 設置Mappr輸出的key和value類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

// 5 設置最終輸出的key和value類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

// 6 設置輸入路徑和輸出路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交

job.waitForCompletion(true);

}

}

總結

以上是生活随笔為你收集整理的java map 实现 序列化,MapReduce序列化的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。