日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce算法–了解数据连接第二部分

發布時間:2023/12/3 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce算法–了解数据连接第二部分 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

自從我上一次發布以來已經有一段時間了,就像我上一次大休息一樣,我正在Coursera上一些課程。 這次是Scala中的函數式編程 原理和反應式編程原理 。 我發現它們都是不錯的課程,如果有時間的話,建議您選一門。 在本文中,我們將繼續介紹如何使用MapReduce實現數據密集型文本處理中的算法的系列文章,這次涵蓋了地圖端連接。 從名稱可以猜出,映射側聯接僅在映射階段連接數據,而完全跳過簡化階段。 在上一篇有關數據連接的文章中,我們介紹了減少側連接 。 減少端連接很容易實現,但缺點是所有數據都通過網絡發送到減少器。 由于我們避免了跨網絡發送數據的開銷,因此地圖端連接可顯著提高性能。 但是,與減少側聯接不同,映射側聯接需要滿足非常特定的條件。 今天,我們將討論地圖端連接的要求以及如何實現它們。

地圖端加入條件

要利用地圖側聯接,我們的數據必須滿足以下條件之一:

  • 要連接的數據集已經按相同的鍵排序,并且具有相同的分區數
  • 在要連接的兩個數據集中,一個足夠小以適合內存
  • 我們將考慮第一種情況,其中有兩個(或更多)數據集需要連接,但是太大而無法容納到內存中。 我們將假設最壞的情況是,文件沒有按相同的順序排序或分區。

    資料格式

    在開始之前,讓我們看一下正在使用的數據。 我們將有兩個數據集:

  • 第一個數據集包括GUID,名字,姓氏,地址,城市和州
  • 第二個數據集由一個GUID和雇主信息組成
  • 兩個數據集均以逗號分隔,并且聯接鍵(GUID)位于第一位置。 加入后,我們希望將數據集2中的雇主信息附加到數據集1的末尾。 此外,我們希望將GUID保持在數據集1的第一個位置,但要從數據集2刪除GUID。
    資料集1:

    aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SCde68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI

    數據集2:

    de68186a-1004-4211-a866-736f414eac61,Jacobs6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Partsaef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms

    合并結果:

    08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms 6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs

    現在,我們繼續介紹如何連接兩個數據集。

    Map-Side連接具有大數據集

    為了能夠執行映射端連接,我們需要按相同的鍵對數據進行排序并具有相同數量的分區,這意味著任何記錄的所有鍵都在同一分區中。 盡管這似乎是一個艱巨的要求,但很容易解決。 Hadoop會對所有鍵進行排序,并保證將具有相同值的鍵發送到相同的reducer。 因此,只需運行一個MapReduce作業,該作業除了通過您要連接的鍵輸出數據,并為所有數據集指定完全相同數量的化簡器,我們將以正確的形式獲取數據。 考慮到能夠進行地圖側連接所帶來的效率提高,可能值得花費額外的MapReduce作業。 在這一點上,有必要重復一遍,至關重要的是,在“準備”階段對數據進行排序和分區時,所有數據集都必須指定完全相同數量的化簡。 在本文中,我們將獲取兩個數據集,并在兩個數據集上運行初始MapReduce作業以進行排序和分區,然后運行最終作業以執行地圖端聯接。 首先,讓我們介紹一下MapReduce作業,以相同的方式對數據進行排序和分區。

    第一步:排序和分區

    首先,我們需要創建一個Mapper ,該Mapper將簡單地選擇要根據給定索引進行排序的鍵:

    public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private Text joinKey = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String separator = context.getConfiguration().get("separator");keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));splitter = Splitter.on(separator);joiner = Joiner.on(separator);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Iterable<String> values = splitter.split(value.toString());joinKey.set(Iterables.get(values,keyIndex));if(keyIndex != 0){value.set(reorderValue(values,keyIndex));}context.write(joinKey,value);}private String reorderValue(Iterable<String> value, int index){List<String> temp = Lists.newArrayList(value);String originalFirst = temp.get(0);String newFirst = temp.get(index);temp.set(0,newFirst);temp.set(index,originalFirst);return joiner.join(temp);} }

    SortByKeyMapper只需通過從在配置參數keyIndex給定位置找到的給定文本行中提取值來簡單地設置joinKey的值。 同樣,如果keyIndex不等于零,我們交換在第一個位置和keyIndex位置中找到的值的順序。 盡管這是一個有問題的功能,但我們稍后將討論為什么要這樣做。 接下來,我們需要一個Reducer :

    public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {private static final NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(nullKey,value);}} }

    SortByKeyReducer寫出給定鍵的所有值,但是會NullWritable鍵并寫一個NullWritable 。 在下一節中,我們將解釋為什么不使用密鑰。

    第二步:Map-Side聯接

    在執行地圖側連接時,記錄在到達映射器之前會被合并。 為此,我們使用CompositeInputFormat 。 我們還需要設置一些配置屬性。 讓我們看一下如何配置地圖側連接:

    private static Configuration getMapJoinConfiguration(String separator, String... paths) {Configuration config = new Configuration();config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);config.set("mapred.join.expr", joinExpression);config.set("separator", separator);return config;}

    首先,我們通過設置mapreduce.input.keyvaluelinerecordreader.key.value.separator屬性來指定用于分隔鍵和值的字符。 接下來,我們使用CompositeInputFormat.compose方法創建一個“聯接表達式”,通過使用單詞“ inner”指定內部聯接 ,然后指定要使用的輸入格式, KeyValueTextInput類以及最后一個String varargs,它們表示文件的路徑。 join(運行map-reduce作業以對數據進行排序和分區的輸出路徑)。 KeyValueTextInputFormat類將使用分隔符將第一個值設置為鍵,其余的將用作該值。

    映射器的加入

    連接源文件中的值后,將Mapper.map方法,該方法將接收該鍵的Text對象(連接記錄中的相同鍵)和一個TupleWritable ,該TupleWritable由輸入文件中的連接值組成對于給定的密鑰。 請記住,我們希望最終輸出的第一個位置具有join-key,然后在一個定界的String中包含所有連接的值。 為此,我們有一個自定義的映射器,將我們的數據以正確的格式放置:

    public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {private static final NullWritable nullKey = NullWritable.get();private Text outValue = new Text();private StringBuilder valueBuilder = new StringBuilder();private String separator;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {separator = context.getConfiguration().get("separator");}@Overrideprotected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {valueBuilder.append(key).append(separator);for (Writable writable : value) {valueBuilder.append(writable.toString()).append(separator);}valueBuilder.setLength(valueBuilder.length() - 1);outValue.set(valueBuilder.toString());context.write(nullKey, outValue);valueBuilder.setLength(0);} }

    在CombineValuesMapper我們將鍵和所有聯接的值附加到一個定界的String 。 在這里,我們終于可以看到在以前的MapReduce作業中放棄加入鍵的原因。 由于鍵是要連接的所有數據集的值中的第一個位置,因此我們的映射器自然會從連接的數據集中消除重復的鍵。 我們需要做的就是將給定的鍵插入StringBuilder ,然后附加TupleWritable包含的值。

    全部放在一起

    現在,我們擁有所有代碼,可以在大型數據集上運行地圖端聯接。 讓我們看一下我們將如何一起運行所有作業。 如前所述,我們假設我們的數據未按相同的順序進行排序和分區,因此我們將需要運行N(在本例中為2)MapReduce作業,以獲取正確格式的數據。 在運行初始排序/分區作業之后,將執行執行實際聯接的最終作業。

    public class MapSideJoinDriver {public static void main(String[] args) throws Exception {String separator = ",";String keyIndex = "0";int numReducers = 10;String jobOneInputPath = args[0];String jobTwoInputPath = args[1];String joinJobOutPath = args[2];String jobOneSortedPath = jobOneInputPath + "_sorted";String jobTwoSortedPath = jobTwoInputPath + "_sorted";Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);mapJoin.setInputFormatClass(CompositeInputFormat.class);List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);int exitStatus = 0;for (Job job : jobs) {boolean jobSuccessful = job.waitForCompletion(true);if (!jobSuccessful) {System.out.println("Error with job " + job.getJobName() + " " + job.getStatus().getFailureInfo());exitStatus = 1;break;}}System.exit(exitStatus);}

    MapSideJoinDriver對運行MapReduce作業進行基本配置。 有趣的一點是,排序/分區作業每個都指定10個化簡器,而最后一個作業將歸化器的數量顯式設置為0,因為我們是在地圖端加入的,不需要化簡階段。 由于我們沒有任何復雜的依賴關系,因此我們將作業放入ArrayList并以線性順序運行作業(第24-33行)。

    結果

    最初,我們有2個文件; 第一個文件中的姓名和地址信息,第二個文件中的就業信息。 這兩個文件在第一列中都有唯一的ID。
    文件一:

    .... 08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC ...

    文件二:

    .... 08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms ....

    結果:

    08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms

    正如我們在這里看到的那樣,我們已經成功地將記錄合并在一起,并保持了文件格式,而結果中沒有重復的鍵。

    結論

    在本文中,我們演示了當兩個數據集都很大且無法容納到內存中時如何執行地圖端連接。 如果您覺得這需要大量工作才能完成,那么您是正確的。 盡管在大多數情況下,我們希望使用像Pig或Hive這樣的高級工具,但了解對大型數據集執行地圖側聯接的機制很有幫助。 當您需要從頭開始編寫解決方案時,尤其如此。 謝謝你的時間。

    資源資源

    • Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
    • Hadoop: Tom White 的權威指南
    • 來自博客的源代碼和測試
    • 愛德華·卡普里奧洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·盧瑟格倫(Jason Rutherglen)的編程蜂巢
    • 通過Alan Gates對Pig進行編程
    • Hadoop API
    • MRUnit用于單元測試Apache Hadoop映射減少工作

    參考: MapReduce算法–了解數據 ,是我們的JCG合作伙伴 Bill Bejeck在《 隨機編碼想法》博客上發表的第二部分。

    翻譯自: https://www.javacodegeeks.com/2014/02/mapreduce-algorithms-understanding-data-joins-part-ii.html

    總結

    以上是生活随笔為你收集整理的MapReduce算法–了解数据连接第二部分的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。