Hadoop Streaming编程实例
(1)對(duì)于一種編寫語(yǔ)言,應(yīng)該怎么編寫Mapper和Reduce,需遵循什么樣的編程規(guī)范
(2) 如何在Hadoop Streaming中自定義Hadoop Counter
(3) 如何在Hadoop Streaming中自定義狀態(tài)信息,進(jìn)而給用戶反饋當(dāng)前作業(yè)執(zhí)行進(jìn)度
(4) 如何在Hadoop Streaming中打印調(diào)試日志,在哪里可以看到這些日志
(5)如何使用Hadoop Streaming處理二進(jìn)制文件,而不僅僅是文本文件
我已經(jīng)在多篇文章中介紹了Hadoop Streaming,如果你對(duì)它還不了解,可以閱讀:“Hadoop Streaming 編程”,“Hadoop Streaming高級(jí)編程”等文章。
本文重點(diǎn)解決前四個(gè)問(wèn)題,給出了C++和Shell編寫的Wordcount實(shí)例,供大家參考。
1. C++版WordCount
(1)Mapper實(shí)現(xiàn)(mapper.cpp)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #include <iostream> #include <string> using namespace std; int main() { ??string key; ??while(cin >> key) { ????cout << key << "\t" << "1" << endl; ????// Define counter named counter_no in group counter_group ????cerr << "reporter:counter:counter_group,counter_no,1\n"; ????// dispaly status ????cerr << "reporter:status:processing......\n"; ????// Print logs for testing ????cerr << "This is log, will be printed in stdout file\n"; ??} ??return 0; } |
(2)Reducer實(shí)現(xiàn)(reducer.cpp)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | #include <iostream> #include <string> using namespace std; int main() { //reducer將會(huì)被封裝成一個(gè)獨(dú)立進(jìn)程,因而需要有main函數(shù) ??string cur_key, last_key, value; ??cin >> cur_key >> value; ??last_key = cur_key; ??int n = 1; ??while(cin >> cur_key) { //讀取map task輸出結(jié)果 ????cin >> value; ????if(last_key != cur_key) { //識(shí)別下一個(gè)key ??????cout << last_key << "\t" << n << endl; ??????last_key = cur_key; ??????n = 1; ????} else { //獲取key相同的所有value數(shù)目 ??????n++; //key值相同的,累計(jì)value值 ????} ??} ??cout << last_key << "\t" << n << endl; ??return 0; } |
(3)編譯運(yùn)行
編譯以上兩個(gè)程序:
g++ -o mapper mapper.cpp
g++ -o reducer reducer.cpp
測(cè)試一下:
echo “dong xicheng is here now, talk to dong xicheng now” | ./mapper | sort | ./reducer
注:上面這種測(cè)試方法會(huì)頻繁打印以下字符串,可以先注釋掉,這些字符串hadoop能夠識(shí)別
reporter:counter:counter_group,counter_no,1
reporter:status:processing……
This is log, will be printed in stdout file
測(cè)試通過(guò)后,可通過(guò)以下腳本將作業(yè)提交到集群中(run_cpp_mr.sh):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-files mapper,reducer\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-mapper mapper\ ??-reducer reducer |
2. Shell版WordCount
(1)Mapper實(shí)現(xiàn)(mapper.sh)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #! /bin/bash while read LINE; do ??for word in $LINE ??do ????echo "$word 1" ????# in streaming, we define counter by ????# [reporter:counter:<group>,<counter>,<amount>] ????# define a counter named counter_no, in group counter_group ????# increase this counter by 1 ????# counter shoule be output through stderr ????echo "reporter:counter:counter_group,counter_no,1" >&2 ????echo "reporter:counter:status,processing......" >&2 ????echo "This is log for testing, will be printed in stdout file" >&2 ??done done |
(2)Reducer實(shí)現(xiàn)(mapper.sh)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #! /bin/bash count=0 started=0 word="" while read LINE;do ??newword=`echo $LINE | cut -d ' '? -f 1` ??if [ "$word" != "$newword" ];then ????[ $started -ne 0 ] && echo "$word\t$count" ????word=$newword ????count=1 ????started=1 ??else ????count=$(( $count + 1 )) ??fi done echo "$word\t$count" |
(3)測(cè)試運(yùn)行
測(cè)試以上兩個(gè)程序:
echo “dong xicheng is here now, talk to dong xicheng now” | sh mapper.sh | sort | sh reducer.sh
注:上面這種測(cè)試方法會(huì)頻繁打印以下字符串,可以先注釋掉,這些字符串hadoop能夠識(shí)別
reporter:counter:counter_group,counter_no,1
reporter:status:processing……
This is log, will be printed in stdout file
測(cè)試通過(guò)后,可通過(guò)以下腳本將作業(yè)提交到集群中(run_shell_mr.sh):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-files mapper.sh,reducer.sh\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-mapper "sh mapper.sh"\ ??-reducer "sh reducer.sh" |
3. 程序說(shuō)明
在Hadoop Streaming中,標(biāo)準(zhǔn)輸入、標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出各有妙用,其中,標(biāo)準(zhǔn)輸入和輸出分別用于接受輸入數(shù)據(jù)和輸出處理結(jié)果,而錯(cuò)誤輸出的意義視內(nèi)容而定:
(1)如果標(biāo)準(zhǔn)錯(cuò)誤輸出的內(nèi)容為:reporter:counter:group,counter,amount,表示將名稱為counter,所在組為group的hadoop counter值增加amount,hadoop第一次讀到這個(gè)counter時(shí),會(huì)創(chuàng)建它,之后查找counter表,增加對(duì)應(yīng)counter值
(2)如果標(biāo)準(zhǔn)錯(cuò)誤輸出的內(nèi)容為:reporter:status:message,則表示在界面或者終端上打印message信息,可以是一些狀態(tài)提示信息
(3)如果采用錯(cuò)誤輸出的內(nèi)容不是以上兩種情況,則表示調(diào)試日志,Hadoop會(huì)將其重定向到stderr文件中。注:每個(gè)Task對(duì)應(yīng)三個(gè)日志文件,分別是stdout、stderr和syslog,都是文本文件,可以在web界面上查看這三個(gè)日志文件內(nèi)容,也可以登錄到task所在節(jié)點(diǎn)上,到對(duì)應(yīng)目錄中查看。
另外,需要注意一點(diǎn),默認(rèn)Map Task輸出的key和value分隔符是\t,Hadoop會(huì)在Map和Reduce階段按照\(chéng)t分離key和value,并對(duì)key排序,注意這點(diǎn)非常重要,當(dāng)然,你可以使用stream.map.output.field.separator指定新的分隔符。
總結(jié)
以上是生活随笔為你收集整理的Hadoop Streaming编程实例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 用Docker之后还需要OpenStac
- 下一篇: 查询在一张表不在另外一张表的记录