MapReduce算法–了解数据联接第1部分
在本文中,我們繼續(xù)執(zhí)行一系列實(shí)現(xiàn)算法的系列,該算法在使用MapReduce進(jìn)行數(shù)據(jù)密集型文本處理中找到,這一次討論數(shù)據(jù)聯(lián)接。 雖然我們將討論在Hadoop中聯(lián)接數(shù)據(jù)的技術(shù)并提供示例代碼,但在大多數(shù)情況下,您可能不會(huì)自己編寫(xiě)代碼來(lái)執(zhí)行聯(lián)接。 取而代之的是,使用可以在更高抽象級(jí)別工作的工具(例如Hive或Pig)可以更好地完成連接數(shù)據(jù)。 如果有可以幫助您處理數(shù)據(jù)的工具,為什么還要花時(shí)間學(xué)習(xí)如何聯(lián)接數(shù)據(jù)呢? 可以說(shuō),聯(lián)接數(shù)據(jù)是Hadoop的最大用途之一。 全面了解Hadoop如何執(zhí)行聯(lián)接對(duì)于確定使用哪個(gè)聯(lián)接以及在出現(xiàn)問(wèn)題時(shí)進(jìn)行調(diào)試至關(guān)重要。 此外,一旦您完全了解了Hadoop中如何執(zhí)行不同的聯(lián)接,就可以更好地利用Hive和Pig等工具。 最后,在一種情況下,一種工具可能無(wú)法滿足您的需求,因此您必須袖手旁觀并自行編寫(xiě)代碼。
加入的需要
在處理大型數(shù)據(jù)集時(shí),如果不是必需的話,通過(guò)公用密鑰連接數(shù)據(jù)的需求可能會(huì)非常有用。 通過(guò)加入數(shù)據(jù),您可以進(jìn)一步獲得洞察力,例如加入時(shí)間戳以將事件與一天中的時(shí)間關(guān)聯(lián)起來(lái)。 連接數(shù)據(jù)的需求多種多樣。 我們將在3個(gè)單獨(dú)的帖子中介紹3種類型的聯(lián)接:Reduce-Side聯(lián)接,Map-Side聯(lián)接和Memory-Backed聯(lián)接。 在這一期中,我們將考慮使用Reduce-Side聯(lián)接。
減少側(cè)面連接
在我們將要討論的聯(lián)接模式中,減少端聯(lián)接是最容易實(shí)現(xiàn)的。 簡(jiǎn)化方聯(lián)接的直接原因是Hadoop將相同的密鑰發(fā)送到相同的reducer,因此默認(rèn)情況下,數(shù)據(jù)是為我們組織的。 要執(zhí)行聯(lián)接,我們只需要緩存一個(gè)密鑰并將其與傳入密鑰進(jìn)行比較。 只要鍵匹配,我們就可以結(jié)合來(lái)自相應(yīng)鍵的值。 由于所有數(shù)據(jù)在整個(gè)網(wǎng)絡(luò)上都經(jīng)過(guò)混洗,因此使用減少側(cè)連接進(jìn)行權(quán)衡是性能。 在減少側(cè)連接中,我們將考慮兩種不同的方案:一對(duì)一和一對(duì)多。 我們還將探索不需要跟蹤傳入密鑰的選項(xiàng); 給定鍵的所有值都將在簡(jiǎn)化器中分組在一起。
一對(duì)一加入
一對(duì)一聯(lián)接的情況是數(shù)據(jù)集“ X”中的值與數(shù)據(jù)集“ Y”中的值共享一個(gè)公共密鑰。 由于Hadoop保證將相等的鍵發(fā)送到同一reducer,因此在兩個(gè)數(shù)據(jù)集上進(jìn)行映射將為我們處理聯(lián)接。 由于僅對(duì)鍵進(jìn)行排序,因此值的順序未知。 我們可以使用輔助排序輕松解決這種情況。 我們二級(jí)排序的實(shí)現(xiàn)方式是用“ 1”或“ 2”標(biāo)記鍵,以確定值的順序。 我們需要采取一些額外的步驟來(lái)實(shí)施我們的標(biāo)記策略。
實(shí)現(xiàn)一個(gè)WritableComparable
首先,我們需要編寫(xiě)一個(gè)實(shí)現(xiàn)WritableComparable接口的類,該接口將用于包裝密鑰。
public class TaggedKey implements Writable, WritableComparable<TaggedKey> {private Text joinKey = new Text();private IntWritable tag = new IntWritable();@Overridepublic int compareTo(TaggedKey taggedKey) {int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());if(compareValue == 0 ){compareValue = this.tag.compareTo(taggedKey.getTag());}return compareValue;}//Details left out for clarity}當(dāng)我們對(duì)TaggedKey類進(jìn)行排序時(shí),具有相同joinKey值的鍵將在tag字段的值上進(jìn)行次要排序,以確保我們想要的順序。
編寫(xiě)自定義分區(qū)程序
接下來(lái),我們需要編寫(xiě)一個(gè)自定義分區(qū)程序,該分區(qū)程序僅在確定復(fù)合鍵和數(shù)據(jù)發(fā)送到哪個(gè)減速器時(shí)才考慮連接鍵:
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {@Overridepublic int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {return taggedKey.getJoinKey().hashCode() % numPartitions;} }至此,我們擁有了連接數(shù)據(jù)并確保值順序的條件。 但是,當(dāng)鍵進(jìn)入reduce()方法時(shí),我們不想跟蹤它們。 我們希望將所有價(jià)值觀歸為一體。 為此,我們將使用Comparator ,該Comparator在決定如何對(duì)值進(jìn)行分組時(shí)僅考慮聯(lián)接鍵。
編寫(xiě)組比較器
用于分組的比較器如下所示:
public class TaggedJoiningGroupingComparator extends WritableComparator {public TaggedJoiningGroupingComparator() {super(TaggedKey.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TaggedKey taggedKey1 = (TaggedKey)a;TaggedKey taggedKey2 = (TaggedKey)b;return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());} }數(shù)據(jù)結(jié)構(gòu)
現(xiàn)在,我們需要確定將用于密鑰的哪些數(shù)據(jù)。 對(duì)于我們的樣本數(shù)據(jù),我們將使用從Fakenames Generator生成的CSV文件。 第一列是GUID,它將用作我們的聯(lián)接鍵。 我們的樣本數(shù)據(jù)包含諸如姓名,地址,電子郵件,工作信息,信用卡和擁有的汽車之類的信息。 為了演示的目的,我們將使用GUID,名稱和地址字段,并將它們放置在一個(gè)結(jié)構(gòu)如下的文件中:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY然后,我們將使用GUID,電子郵件地址,用戶名,密碼和信用卡號(hào)字段,然后將其放置在另一個(gè)文件中,該文件應(yīng)類似于:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard, 5265896533330445 aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,524現(xiàn)在,我們需要有一個(gè)Mapper,它將知道如何處理我們的數(shù)據(jù)以提取正確的聯(lián)接鍵并設(shè)置正確的標(biāo)簽。
創(chuàng)建映射器
這是我們的Mapper代碼:
public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private TaggedKey taggedKey = new TaggedKey();private Text data = new Text();private int joinOrder;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));String separator = context.getConfiguration().get("separator");splitter = Splitter.on(separator).trimResults();joiner = Joiner.on(separator);FileSplit fileSplit = (FileSplit)context.getInputSplit();joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {List<String> values = Lists.newArrayList(splitter.split(value.toString()));String joinKey = values.remove(keyIndex);String valuesWithOutKey = joiner.join(values);taggedKey.set(joinKey, joinOrder);data.set(valuesWithOutKey);context.write(taggedKey, data);}}讓我們回顧一下setup()方法中發(fā)生的事情。
我們還應(yīng)該討論map()方法中發(fā)生的事情:
因此,我們已經(jīng)讀入數(shù)據(jù),提取了密鑰,設(shè)置了連接順序,然后將數(shù)據(jù)寫(xiě)回了。 讓我們看一下如何結(jié)合數(shù)據(jù)。
聯(lián)接數(shù)據(jù)
現(xiàn)在讓我們看一下數(shù)據(jù)如何在化簡(jiǎn)器中聯(lián)接:
public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {private Text joinedText = new Text();private StringBuilder builder = new StringBuilder();private NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {builder.append(key.getJoinKey()).append(",");for (Text value : values) {builder.append(value.toString()).append(",");}builder.setLength(builder.length()-1);joinedText.set(builder.toString());context.write(nullKey, joinedText);builder.setLength(0);} }因?yàn)閹в小?1”標(biāo)簽的密鑰首先到達(dá)了還原器,所以我們知道名稱和地址數(shù)據(jù)是第一個(gè)值,而電子郵件,用戶名,密碼和信用卡數(shù)據(jù)是第二個(gè)值。 因此,我們不需要跟蹤任何鍵。 我們只需遍歷值并將它們連接在一起。
一對(duì)一加入結(jié)果
這是運(yùn)行我們的一對(duì)一MapReduce作業(yè)的結(jié)果:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard, 5265896533330445 aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard, 5243379373546690正如我們可以看到的,以上示例數(shù)據(jù)中的兩條記錄已合并為一條記錄。 我們已經(jīng)成功地將GUID,名稱,地址,電子郵件地址,用戶名,密碼和信用卡字段加入到一個(gè)文件中。
指定加入順序
此時(shí),我們可能會(huì)問(wèn)如何為多個(gè)文件指定連接順序? 答案就在我們的ReduceSideJoinDriver類中,該類充當(dāng)MapReduce程序的驅(qū)動(dòng)程序。
public class ReduceSideJoinDriver {public static void main(String[] args) throws Exception {Splitter splitter = Splitter.on('/');StringBuilder filePaths = new StringBuilder();Configuration config = new Configuration();config.set("keyIndex", "0");config.set("separator", ",");for(int i = 0; i< args.length - 1; i++) {String fileName = Iterables.getLast(splitter.split(args[i]));config.set(fileName, Integer.toString(i+1));filePaths.append(args[i]).append(",");}filePaths.setLength(filePaths.length() - 1);Job job = Job.getInstance(config, "ReduceSideJoin");job.setJarByClass(ReduceSideJoinDriver.class);FileInputFormat.addInputPaths(job, filePaths.toString());FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));job.setMapperClass(JoiningMapper.class);job.setReducerClass(JoiningReducer.class);job.setPartitionerClass(TaggedJoiningPartitioner.class);job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);job.setOutputKeyClass(TaggedKey.class);job.setOutputValueClass(Text.class);System.exit(job.waitForCompletion(true) ? 0 : 1);} }通過(guò)使用分區(qū)程序和分組比較器,我們知道第一個(gè)值屬于第一個(gè)鍵,并且可以用于將Iterable包含的所有其他值連接到給定鍵的reduce()方法中。 現(xiàn)在是時(shí)候考慮一??對(duì)多聯(lián)接了。
一對(duì)多加入
好消息是到目前為止,我們已經(jīng)完成了所有工作,實(shí)際上我們可以使用代碼執(zhí)行一對(duì)多連接。 對(duì)于一對(duì)多聯(lián)接,我們可以考慮兩種方法:1)一個(gè)包含單個(gè)記錄的小文件,另一個(gè)包含具有相同鍵的多個(gè)記錄的文件,以及2)同樣具有單個(gè)記錄的小文件,但是N每個(gè)文件包含與第一個(gè)文件匹配的記錄的文件數(shù)。 主要區(qū)別在于,采用第一種方法時(shí),除了前兩個(gè)鍵的聯(lián)接之外,值的順序?qū)⑹俏粗摹?但是,使用第二種方法,我們將“標(biāo)記”每個(gè)聯(lián)接文件,以便我們可以控制所有聯(lián)接值的順序。 對(duì)于我們的示例,第一個(gè)文件將保留為我們的GUID名稱-地址文件,并且我們將擁有3個(gè)其他文件,其中將包含汽車,雇主和工作描述記錄。 這可能不是最現(xiàn)實(shí)的情況,但將用于演示。 以下是在進(jìn)行聯(lián)接之前數(shù)據(jù)外觀的示例:
//The single person records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY //Automobile records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze 81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5 aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic //Employer records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth 81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods //Job Description records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk 81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher一對(duì)多加入結(jié)果
現(xiàn)在,讓我們看一下一對(duì)多聯(lián)接結(jié)果的示例(使用上面的相同值來(lái)輔助比較):
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher結(jié)果表明,我們已經(jīng)能夠成功地以指定順序連接多個(gè)值。
結(jié)論
我們已經(jīng)成功演示了如何在MapReduce中執(zhí)行約簡(jiǎn)邊連接。 即使該方法并不太復(fù)雜,我們也可以看到在Hadoop中執(zhí)行聯(lián)接可能涉及編寫(xiě)大量代碼。 雖然學(xué)習(xí)聯(lián)接的工作方式是一項(xiàng)有用的練習(xí),但是在大多數(shù)情況下,使用Hive或Pig這樣的工具聯(lián)接數(shù)據(jù)要好得多。 謝謝你的時(shí)間。
資源資源
- Jimmy Lin和Chris Dyer 使用MapReduce進(jìn)行的數(shù)據(jù)密集型處理
- Hadoop: Tom White 的權(quán)威指南
- 來(lái)自博客的源代碼和測(cè)試
- 愛(ài)德華·卡普里奧洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·盧瑟格倫(Jason Rutherglen)的編程蜂巢
- 通過(guò)Alan Gates對(duì)Pig進(jìn)行編程
- Hadoop API
- MRUnit用于單元測(cè)試Apache Hadoop映射減少工作
翻譯自: https://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html
總結(jié)
以上是生活随笔為你收集整理的MapReduce算法–了解数据联接第1部分的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: linux查看网卡信息 命令(linux
- 下一篇: 选择技术栈构建通用平台