日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(八)Mapreduce高级shuffle之Partitioner

發(fā)布時間:2023/12/3 编程问答 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(八)Mapreduce高级shuffle之Partitioner 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、Partitioner概述

Map階段總共五個步驟,2就是一個分區(qū)操作

?

哪個key到哪個Reducer的分配過程,是由Partitioner規(guī)定的。

?

二、Hadoop內(nèi)置Partitioner

MapReduce的使用者通常會指定Reduce任務(wù)和Reduce任務(wù)輸出文件的數(shù)量(R)。
用戶在中間key上使用分區(qū)函數(shù)來對數(shù)據(jù)進行分區(qū),之后在輸入到后續(xù)任務(wù)執(zhí)行進程。一個默認的分區(qū)函數(shù)式使用hash方法(比如常見的:hash(key) mod R)進行分區(qū)。hash方法能夠產(chǎn)生非常平衡的分區(qū)。

Hadoop中自帶了一個默認的分區(qū)類HashPartitioner,
它繼承了Partitioner類,提供了一個getPartition的方法

/** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return? (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

將key均勻布在Reduce Tasks上
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;?

如果Key為Text的話,Text的hashcode方法跟String的基本一致,都是采用的Horner公式計算,得到一個int整數(shù)。但是,如果string太大的話這個int整數(shù)值可能會溢出變成負數(shù),所以和整數(shù)的上限值Integer.MAX_VALUE(即0111111111111111)進行與運算,然后再對reduce任務(wù)個數(shù)取余,這樣就可以讓key均勻分布在reduce上?

?

三、自定制Partitioner

一般我們都會使用默認的分區(qū)函數(shù)HashPartitioner

自定義數(shù)據(jù)類型處理手機上網(wǎng)日志: 在第二列上并不是所有的數(shù)據(jù)都是手機號(84138413并不是一個手機號),任務(wù)就是在統(tǒng)計手機流量時,將手機號碼和非手機號輸出到不同的文件中

自定義MKPartitioner

?public static class MKPartitioner extends Partitioner<Text, KpiWritable> {@Overridepublic int getPartition(Text key, KpiWritable value, int numPartitions) {// 實現(xiàn)不同的長度不同的號碼分配到不同的reduce task中int numLength = key.toString().length();if (numLength == 11) return 0;else ? return 1;?}}

設(shè)置為打包運行,設(shè)置Partitioner為MKPartitioner設(shè)置ReducerTask的個數(shù)為2
注意:分區(qū)的例子必須要設(shè)置為打成jar包運行!

? ? public int run(String[] args) throws Exception {// 定義一個作業(yè)Job job = new Job(getConf(), "MyJob");// 分區(qū)需要設(shè)置為打包運行job.setJarByClass(MyJob.class);// 設(shè)置輸入目錄FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));// 設(shè)置自定義Mapper類job.setMapperClass(MyMapper.class);// 指定<k2,v2>的類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(KpiWritable.class);// 設(shè)置Partitionerjob.setPartitionerClass(NKPartitioner.class);job.setNumReduceTasks(2);// 設(shè)置自定義Reducer類job.setReducerClass(MyReducer.class);// 指定<k3,v3>的類型job.setOutputKeyClass(Text.class);job.setOutputKeyClass(KpiWritable.class);// 設(shè)置輸出目錄FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));// 提交作業(yè)System.exit(job.waitForCompletion(true) ? 0 : 1);return 0;}

打成jar包并在Hadoop中運行

  • 通過Idea導(dǎo)出jar包
  • 通過FTP上傳到Linux中,可以使用各種FTP工具
  • 通過Hadoop Shell執(zhí)行jar包中的程序
  • 通過Web接口驗證Partitioner的運行:

    • 通過訪問http://hadoop01:50030 查看 是否有2個Reduce任務(wù)?
    • Reduce輸出結(jié)果是否一致?

    小結(jié):

    • 分區(qū)Partitioner主要作用在于以下兩點 ?根據(jù)業(yè)務(wù)需要,產(chǎn)生多個輸出文件
    • 多個reduce任務(wù)并發(fā)運行,提高整體job的運行效率

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop入门(八)Mapreduce高级shuffle之Partitioner的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。