Hadoop Streaming 编程
Hadoop Streaming是Hadoop提供的一個(gè)編程工具,它允許用戶使用任何可執(zhí)行文件或者腳本文件作為Mapper和Reducer,例如:
采用shell腳本語言中的一些命令作為mapper和reducer(cat作為mapper,wc作為reducer)
$HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
本文安排如下,第二節(jié)介紹Hadoop Streaming的原理,第三節(jié)介紹Hadoop Streaming的使用方法,第四節(jié)介紹Hadoop Streaming的程序編寫方法,在這一節(jié)中,用C++、C、shell腳本 和python實(shí)現(xiàn)了WordCount作業(yè),第五節(jié)總結(jié)了常見的問題。文章最后給出了程序下載地址。(本文內(nèi)容基于Hadoop-0.20.2版本)
(注:如果你采用的語言為C或者C++,也可以使用Hadoop Pipes,具體可參考這篇文章:Hadoop Pipes編程。)
關(guān)于Hadoop Streaming高級(jí)編程方法,可參考這篇文章:Hadoop Streaming高級(jí)編程,Hadoop編程實(shí)例。
2、Hadoop Streaming原理
mapper和reducer會(huì)從標(biāo)準(zhǔn)輸入中讀取用戶數(shù)據(jù),一行一行處理后發(fā)送給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)建MapReduce作業(yè),發(fā)送給各個(gè)tasktracker,同時(shí)監(jiān)控整個(gè)作業(yè)的執(zhí)行過程。
如果一個(gè)文件(可執(zhí)行或者腳本)作為mapper,mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把該文件作為一個(gè)單獨(dú)進(jìn)程啟動(dòng),mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。 同時(shí),mapper收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key/value對(duì),作為mapper的輸出。 默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。
對(duì)于reducer,類似。
以上是Map/Reduce框架和streaming mapper/reducer之間的基本通信協(xié)議。
3、Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執(zhí)行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執(zhí)行文件或者腳本
(5)-file:打包文件到提交的作業(yè)中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實(shí)現(xiàn))
(8)-D:作業(yè)的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數(shù)目
2)mapred.reduce.tasks:reduce task數(shù)目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數(shù)
據(jù)的分隔符,默認(rèn)均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數(shù)目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數(shù)目
另外,Hadoop本身還自帶一些好用的Mapper和Reducer:
(1)??? Hadoop聚集功能
Aggregate提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對(duì)產(chǎn)生“可聚合項(xiàng)”。Combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的選取(類似于Unix中的‘cut’)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut”工具。工具類中的map函數(shù)把輸入的key/value對(duì)看作字段的列表。 用戶可以指定字段的分隔符(默認(rèn)是tab),可以選擇字段列表中任意一段(由列表中一個(gè)或多個(gè)字段組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數(shù)也把輸入的key/value對(duì)看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。
4、Mapper和Reducer實(shí)現(xiàn)
本節(jié)試圖用盡可能多的語言編寫Mapper和Reducer,包括Java,C,C++,Shell腳本,python等(初學(xué)者運(yùn)行第一個(gè)程序時(shí),務(wù)必要閱讀第5部分 “常見問題及解決方案”!!!!)。
由于Hadoop會(huì)自動(dòng)解析數(shù)據(jù)文件到Mapper或者Reducer的標(biāo)準(zhǔn)輸入中,以供它們讀取使用,所有應(yīng)先了解各個(gè)語言獲取標(biāo)準(zhǔn)輸入的方法。
(1)????Java語言:
見Hadoop自帶例子
(2)????C++語言:
| 1 2 3 4 5 | string?key; while(cin>>key){ ??cin>>value; ???…. } |
(3)??C語言:
| 1 2 3 4 5 | char buffer[BUF_SIZE]; while(fgets(buffer, BUF_SIZE - 1, stdin)){ ??int len = strlen(buffer); ??… } |
(4)??Shell腳本
管道
(5)??Python腳本
| 1 2 3 | import?sys for?line?in?sys.stdin: ....... |
為了說明各種語言編寫Hadoop Streaming程序的方法,下面以WordCount為例,WordCount作業(yè)的主要功能是對(duì)用戶輸入的數(shù)據(jù)中所有字符串進(jìn)行計(jì)數(shù)。
(1)C語言實(shí)現(xiàn)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | //mapper #include <stdio.h> #include <string.h> #include <stdlib.h> #define BUF_SIZE??????? 2048 #define DELIM?? "\n" int main(int argc, char *argv[]){ ?????char buffer[BUF_SIZE]; ?????while(fgets(buffer, BUF_SIZE - 1, stdin)){ ????????????int len = strlen(buffer); ????????????if(buffer[len-1] == '\n') ?????????????buffer[len-1] = 0; ????????????char *querys? = index(buffer, ' '); ????????????char *query = NULL; ????????????if(querys == NULL) continue; ????????????querys += 1; /*? not to include '\t' */ ????????????query = strtok(buffer, " "); ????????????while(query){ ???????????????????printf("%s\t1\n", query); ???????????????????query = strtok(NULL, " "); ????????????} ?????} ?????return 0; } //--------------------------------------------------------------------------------------- //reducer #include <stdio.h> #include <string.h> #include <stdlib.h> #define BUFFER_SIZE???? 1024 #define DELIM?? "\t" int main(int argc, char *argv[]){ ?char strLastKey[BUFFER_SIZE]; ?char strLine[BUFFER_SIZE]; ?int count = 0; ?*strLastKey = '\0'; ?*strLine = '\0'; ?while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){ ???char *strCurrKey = NULL; ???char *strCurrNum = NULL; ???strCurrKey? = strtok(strLine, DELIM); ???strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */ ???if( strLastKey[0] == '\0'){ ?????strcpy(strLastKey, strCurrKey); ???} ???if(strcmp(strCurrKey, strLastKey)) { ?????printf("%s\t%d\n", strLastKey, count); ?????count = atoi(strCurrNum); ???} else { ?????count += atoi(strCurrNum); ???} ???strcpy(strLastKey, strCurrKey); ?} ?printf("%s\t%d\n", strLastKey, count); /* flush the count */ ?return 0; } |
(2)C++語言實(shí)現(xiàn)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | //mapper #include <stdio.h> #include <string> #include <iostream> using namespace std; int main(){ ????????string key; ????????string value = "1"; ????????while(cin>>key){ ????????????????cout<<key<<"\t"<<value<<endl; ????????} ????????return 0; } //------------------------------------------------------------------------------------------------------------ //reducer #include <string> #include <map> #include <iostream> #include <iterator> using namespace std; int main(){ ????????string key; ????????string value; ????????map<string, int> word2count; ????????map<string, int>::iterator it; ????????while(cin>>key){ ????????????????cin>>value; ????????????????it = word2count.find(key); ????????????????if(it != word2count.end()){ ????????????????????????(it->second)++; ????????????????} ????????????????else{ ????????????????????????word2count.insert(make_pair(key, 1)); ????????????????} ????????} ????????for(it = word2count.begin(); it != word2count.end(); ++it){ ????????????????cout<<it->first<<"\t"<<it->second<<endl; ????????} ????????return 0; } |
(3)shell腳本語言實(shí)現(xiàn)
簡(jiǎn)約版,每行一個(gè)單詞:
| 1 2 3 4 5 | $HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/hadoop-streaming.jar \ ????-input myInputDirs \ ????-output myOutputDir \ ????-mapper cat \ ???-reducer? wc |
詳細(xì)版,每行可有多個(gè)單詞(由史江明編寫):?mapper.sh
| 1 2 3 4 5 6 7 | #! /bin/bash while read LINE; do ??for word in $LINE ??do ????echo "$word 1" ??done done |
reducer.sh
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #! /bin/bash count=0 started=0 word="" while read LINE;do ??newword=`echo $LINE | cut -d ' '? -f 1` ??if [ "$word" != "$newword" ];then ????[ $started -ne 0 ] && echo "$word\t$count" ????word=$newword ????count=1 ????started=1 ??else ????count=$(( $count + 1 )) ??fi done echo "$word\t$count" |
(4)Python腳本語言實(shí)現(xiàn)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | #!/usr/bin/env python import sys # maps words to their counts word2count = {} # input comes from STDIN (standard input) for line in sys.stdin: ????# remove leading and trailing whitespace ????line = line.strip() ????# split the line into words while removing any empty strings ????words = filter(lambda word: word, line.split()) ????# increase counters ????for word in words: ????????# write the results to STDOUT (standard output); ????????# what we output here will be the input for the ????????# Reduce step, i.e. the input for reducer.py ????????# ????????# tab-delimited; the trivial word count is 1 ????????print '%s\t%s' % (word, 1) #--------------------------------------------------------------------------------------------------------- #!/usr/bin/env python from operator import itemgetter import sys # maps words to their counts word2count = {} # input comes from STDIN for line in sys.stdin: ????# remove leading and trailing whitespace ????line = line.strip() ????# parse the input we got from mapper.py ????word, count = line.split() ????# convert count (currently a string) to int ????try: ????????count = int(count) ????????word2count[word] = word2count.get(word, 0) + count ????except ValueError: ????????# count was not a number, so silently ????????# ignore/discard this line ????????pass # sort the words lexigraphically; # # this step is NOT required, we just do it so that our # final output will look more like the official Hadoop # word count examples sorted_word2count = sorted(word2count.items(), key=itemgetter(0)) # write the results to STDOUT (standard output) for word, count in sorted_word2count: ????print '%s\t%s'% (word, count) |
5、常見問題及解決方案
(1)作業(yè)總是運(yùn)行失敗,
提示找不多執(zhí)行程序, 比如“Caused by: java.io.IOException: Cannot run?program “/user/hadoop/Mapper”: error=2, No such file or directory”:
可在提交作業(yè)時(shí),采用-file選項(xiàng)指定這些文件, 比如上面例子中,可以使用“-file Mapper -file Reducer” 或者 “-file Mapper.py -file Reducer.py”, 這樣,Hadoop會(huì)將這兩個(gè)文件自動(dòng)分發(fā)到各個(gè)節(jié)點(diǎn)上,比如:
$HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper Mapper.py\
-reducer Reducerr.py\
-file?Mapper.py?\
-file Reducer.py
(2)用腳本編寫時(shí),第一行需注明腳本解釋器,默認(rèn)是shell ? (3)如何對(duì)Hadoop Streaming程序進(jìn)行測(cè)試? ? Hadoop Streaming程序的一個(gè)優(yōu)點(diǎn)是易于測(cè)試,比如在Wordcount例子中,可以運(yùn)行以下命令在本地進(jìn)行測(cè)試:
cat input.txt | python?Mapper.py | sort | python Reducer.py
或者
cat input.txt | ./Mapper | sort | ./Reducer
總結(jié)
以上是生活随笔為你收集整理的Hadoop Streaming 编程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Python实现Hadoop Map
- 下一篇: Hadoop YARN