MapReduce算法–了解数据连接第二部分
自從我上一次發(fā)布以來已經(jīng)有一段時間了,就像我上一次大休息一樣,我正在Coursera上一些課程。 這次是Scala中的函數(shù)式編程 原理和反應(yīng)式編程原理 。 我發(fā)現(xiàn)它們都是不錯的課程,如果有時間的話,建議您選一門。 在本文中,我們將繼續(xù)介紹如何使用MapReduce實現(xiàn)數(shù)據(jù)密集型文本處理中的算法的系列文章,這次涵蓋了地圖端連接。 從名稱可以猜出,映射側(cè)聯(lián)接僅在映射階段連接數(shù)據(jù),而完全跳過簡化階段。 在上一篇有關(guān)數(shù)據(jù)連接的文章中,我們介紹了減少側(cè)連接 。 減少端連接很容易實現(xiàn),但缺點是所有數(shù)據(jù)都通過網(wǎng)絡(luò)發(fā)送到減少器。 由于我們避免了跨網(wǎng)絡(luò)發(fā)送數(shù)據(jù)的開銷,因此地圖端連接可顯著提高性能。 但是,與減少側(cè)聯(lián)接不同,映射側(cè)聯(lián)接需要滿足非常特定的條件。 今天,我們將討論地圖端連接的要求以及如何實現(xiàn)它們。
地圖端加入條件
要利用地圖側(cè)聯(lián)接,我們的數(shù)據(jù)必須滿足以下條件之一:
我們將考慮第一種情況,其中有兩個(或更多)數(shù)據(jù)集需要連接,但是太大而無法容納到內(nèi)存中。 我們將假設(shè)最壞的情況是,文件沒有按相同的順序排序或分區(qū)。
資料格式
在開始之前,讓我們看一下正在使用的數(shù)據(jù)。 我們將有兩個數(shù)據(jù)集:
兩個數(shù)據(jù)集均以逗號分隔,并且聯(lián)接鍵(GUID)位于第一位置。 加入后,我們希望將數(shù)據(jù)集2中的雇主信息附加到數(shù)據(jù)集1的末尾。 此外,我們希望將GUID保持在數(shù)據(jù)集1的第一個位置,但要從數(shù)據(jù)集2刪除GUID。
資料集1:
數(shù)據(jù)集2:
de68186a-1004-4211-a866-736f414eac61,Jacobs6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Partsaef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms合并結(jié)果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms 6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs現(xiàn)在,我們繼續(xù)介紹如何連接兩個數(shù)據(jù)集。
Map-Side連接具有大數(shù)據(jù)集
為了能夠執(zhí)行映射端連接,我們需要按相同的鍵對數(shù)據(jù)進行排序并具有相同數(shù)量的分區(qū),這意味著任何記錄的所有鍵都在同一分區(qū)中。 盡管這似乎是一個艱巨的要求,但很容易解決。 Hadoop會對所有鍵進行排序,并保證將具有相同值的鍵發(fā)送到相同的reducer。 因此,只需運行一個MapReduce作業(yè),該作業(yè)除了通過您要連接的鍵輸出數(shù)據(jù),并為所有數(shù)據(jù)集指定完全相同數(shù)量的化簡器,我們將以正確的形式獲取數(shù)據(jù)。 考慮到能夠進行地圖側(cè)連接所帶來的效率提高,可能值得花費額外的MapReduce作業(yè)。 在這一點上,有必要重復(fù)一遍,至關(guān)重要的是,在“準備”階段對數(shù)據(jù)進行排序和分區(qū)時,所有數(shù)據(jù)集都必須指定完全相同數(shù)量的化簡。 在本文中,我們將獲取兩個數(shù)據(jù)集,并在兩個數(shù)據(jù)集上運行初始MapReduce作業(yè)以進行排序和分區(qū),然后運行最終作業(yè)以執(zhí)行地圖端聯(lián)接。 首先,讓我們介紹一下MapReduce作業(yè),以相同的方式對數(shù)據(jù)進行排序和分區(qū)。
第一步:排序和分區(qū)
首先,我們需要創(chuàng)建一個Mapper ,該Mapper將簡單地選擇要根據(jù)給定索引進行排序的鍵:
public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private Text joinKey = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String separator = context.getConfiguration().get("separator");keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));splitter = Splitter.on(separator);joiner = Joiner.on(separator);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Iterable<String> values = splitter.split(value.toString());joinKey.set(Iterables.get(values,keyIndex));if(keyIndex != 0){value.set(reorderValue(values,keyIndex));}context.write(joinKey,value);}private String reorderValue(Iterable<String> value, int index){List<String> temp = Lists.newArrayList(value);String originalFirst = temp.get(0);String newFirst = temp.get(index);temp.set(0,newFirst);temp.set(index,originalFirst);return joiner.join(temp);} }SortByKeyMapper只需通過從在配置參數(shù)keyIndex給定位置找到的給定文本行中提取值來簡單地設(shè)置joinKey的值。 同樣,如果keyIndex不等于零,我們交換在第一個位置和keyIndex位置中找到的值的順序。 盡管這是一個有問題的功能,但我們稍后將討論為什么要這樣做。 接下來,我們需要一個Reducer :
public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {private static final NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(nullKey,value);}} }SortByKeyReducer寫出給定鍵的所有值,但是會NullWritable鍵并寫一個NullWritable 。 在下一節(jié)中,我們將解釋為什么不使用密鑰。
第二步:Map-Side聯(lián)接
在執(zhí)行地圖側(cè)連接時,記錄在到達映射器之前會被合并。 為此,我們使用CompositeInputFormat 。 我們還需要設(shè)置一些配置屬性。 讓我們看一下如何配置地圖側(cè)連接:
private static Configuration getMapJoinConfiguration(String separator, String... paths) {Configuration config = new Configuration();config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);config.set("mapred.join.expr", joinExpression);config.set("separator", separator);return config;}首先,我們通過設(shè)置mapreduce.input.keyvaluelinerecordreader.key.value.separator屬性來指定用于分隔鍵和值的字符。 接下來,我們使用CompositeInputFormat.compose方法創(chuàng)建一個“聯(lián)接表達式”,通過使用單詞“ inner”指定內(nèi)部聯(lián)接 ,然后指定要使用的輸入格式, KeyValueTextInput類以及最后一個String varargs,它們表示文件的路徑。 join(運行map-reduce作業(yè)以對數(shù)據(jù)進行排序和分區(qū)的輸出路徑)。 KeyValueTextInputFormat類將使用分隔符將第一個值設(shè)置為鍵,其余的將用作該值。
映射器的加入
連接源文件中的值后,將Mapper.map方法,該方法將接收該鍵的Text對象(連接記錄中的相同鍵)和一個TupleWritable ,該TupleWritable由輸入文件中的連接值組成對于給定的密鑰。 請記住,我們希望最終輸出的第一個位置具有join-key,然后在一個定界的String中包含所有連接的值。 為此,我們有一個自定義的映射器,將我們的數(shù)據(jù)以正確的格式放置:
public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {private static final NullWritable nullKey = NullWritable.get();private Text outValue = new Text();private StringBuilder valueBuilder = new StringBuilder();private String separator;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {separator = context.getConfiguration().get("separator");}@Overrideprotected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {valueBuilder.append(key).append(separator);for (Writable writable : value) {valueBuilder.append(writable.toString()).append(separator);}valueBuilder.setLength(valueBuilder.length() - 1);outValue.set(valueBuilder.toString());context.write(nullKey, outValue);valueBuilder.setLength(0);} }在CombineValuesMapper我們將鍵和所有聯(lián)接的值附加到一個定界的String 。 在這里,我們終于可以看到在以前的MapReduce作業(yè)中放棄加入鍵的原因。 由于鍵是要連接的所有數(shù)據(jù)集的值中的第一個位置,因此我們的映射器自然會從連接的數(shù)據(jù)集中消除重復(fù)的鍵。 我們需要做的就是將給定的鍵插入StringBuilder ,然后附加TupleWritable包含的值。
全部放在一起
現(xiàn)在,我們擁有所有代碼,可以在大型數(shù)據(jù)集上運行地圖端聯(lián)接。 讓我們看一下我們將如何一起運行所有作業(yè)。 如前所述,我們假設(shè)我們的數(shù)據(jù)未按相同的順序進行排序和分區(qū),因此我們將需要運行N(在本例中為2)MapReduce作業(yè),以獲取正確格式的數(shù)據(jù)。 在運行初始排序/分區(qū)作業(yè)之后,將執(zhí)行執(zhí)行實際聯(lián)接的最終作業(yè)。
public class MapSideJoinDriver {public static void main(String[] args) throws Exception {String separator = ",";String keyIndex = "0";int numReducers = 10;String jobOneInputPath = args[0];String jobTwoInputPath = args[1];String joinJobOutPath = args[2];String jobOneSortedPath = jobOneInputPath + "_sorted";String jobTwoSortedPath = jobTwoInputPath + "_sorted";Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);mapJoin.setInputFormatClass(CompositeInputFormat.class);List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);int exitStatus = 0;for (Job job : jobs) {boolean jobSuccessful = job.waitForCompletion(true);if (!jobSuccessful) {System.out.println("Error with job " + job.getJobName() + " " + job.getStatus().getFailureInfo());exitStatus = 1;break;}}System.exit(exitStatus);}MapSideJoinDriver對運行MapReduce作業(yè)進行基本配置。 有趣的一點是,排序/分區(qū)作業(yè)每個都指定10個化簡器,而最后一個作業(yè)將歸化器的數(shù)量顯式設(shè)置為0,因為我們是在地圖端加入的,不需要化簡階段。 由于我們沒有任何復(fù)雜的依賴關(guān)系,因此我們將作業(yè)放入ArrayList并以線性順序運行作業(yè)(第24-33行)。
結(jié)果
最初,我們有2個文件; 第一個文件中的姓名和地址信息,第二個文件中的就業(yè)信息。 這兩個文件在第一列中都有唯一的ID。
文件一:
文件二:
.... 08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms ....結(jié)果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms正如我們在這里看到的那樣,我們已經(jīng)成功地將記錄合并在一起,并保持了文件格式,而結(jié)果中沒有重復(fù)的鍵。
結(jié)論
在本文中,我們演示了當兩個數(shù)據(jù)集都很大且無法容納到內(nèi)存中時如何執(zhí)行地圖端連接。 如果您覺得這需要大量工作才能完成,那么您是正確的。 盡管在大多數(shù)情況下,我們希望使用像Pig或Hive這樣的高級工具,但了解對大型數(shù)據(jù)集執(zhí)行地圖側(cè)聯(lián)接的機制很有幫助。 當您需要從頭開始編寫解決方案時,尤其如此。 謝謝你的時間。
資源資源
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數(shù)據(jù)密集型處理
- Hadoop: Tom White 的權(quán)威指南
- 來自博客的源代碼和測試
- 愛德華·卡普里奧洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·盧瑟格倫(Jason Rutherglen)的編程蜂巢
- 通過Alan Gates對Pig進行編程
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
翻譯自: https://www.javacodegeeks.com/2014/02/mapreduce-algorithms-understanding-data-joins-part-ii.html
總結(jié)
以上是生活随笔為你收集整理的MapReduce算法–了解数据连接第二部分的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 入川备案手续怎么办理(入川备案手续)
- 下一篇: 使用CDI的InjectionPoint