日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Hadoop Streaming

發布時間:2025/3/21 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop Streaming 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hadoop Streaming

Hadoop Streaming

??? Hadoop Streaming
??? Streaming工作原理
??? 將文件打包到提交的作業中
??? Streaming選項與用法
??????? 只使用Mapper的作業
??????? 為作業指定其他插件
??????? Hadoop Streaming中的大文件和檔案
??????? 為作業指定附加配置參數
??????? 其他選項
??? 其他例子
??????? 使用自定義的方法切分行來形成Key/Value對
??????? 一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)
??????? Hadoop聚合功能包的使用(-reduce aggregate 選項)
??????? 字段的選取(類似于unix中的 'cut' 命令)
??? 常見問題
??????? 我該怎樣使用Hadoop Streaming運行一組獨立(相關)的任務呢?
??????? 如何處理多個文件,其中每個文件一個map?
??????? 應該使用多少個reducer?
??????? 如果在Shell腳本里設置一個別名,并放在-mapper之后,Streaming會正常運行嗎?例如,alias cl='cut -fl',-mapper "cl"會運行正常嗎?
??????? 我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?
??????? 在streaming作業中用-file選項運行一個分布式的超大可執行文件(例如,3.6G)時,我得到了一個錯誤信息“No space left on device”。如何解決?
??????? 如何設置多個輸入目錄?
??????? 如何生成gzip格式的輸出文件?
??????? Streaming中如何自定義input/output format?
??????? Streaming如何解析XML文檔?
??????? 在streaming應用程序中如何更新計數器?
??????? 如何更新streaming應用程序的狀態?


Hadoop streaming是Hadoop的一個工具, 它幫助用戶創建和運行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行文件或腳本文件充當mapper或者reducer。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper /bin/cat \-reducer /bin/wc

Streaming工作原理

在上面的例子里,mapper和reducer都是可執行文件,它們從標準輸入讀入數據(一行一行讀),并把計算結果發給標準輸出。Streaming工具會創建一個Map/Reduce作業,并把它發送給合適的集群,同時監視這個作業的整個執行過程。

如果一個可執行文件被用于mapper,則在mapper初始化時,每一個mapper任務會把這個可執行文件作為一個單獨的進程啟動。mapper任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。同時,mapper收集可執行文件進程標準輸出的內容,并把收到的每一行內容轉化成key/value對,作為mapper的輸出。默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。不過,這可以定制,在下文中將會討論如何自定義key和value的切分方式。

如果一個可執行文件被用于reducer,每個reducer任務會把這個可執行文件作為一個單獨的進程啟動。Reducer任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。同時,reducer收集可執行文件進程標準輸出的內容,并把每一行內容轉化成key/value對,作為reducer的輸出。默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。在下文中將會討論如何自定義key和value的切分方式。

這是Map/Reduce框架和streaming mapper/reducer之間的基本通信協議。

用戶也可以使用java類作為mapper或者reducer。上面的例子與這里的代碼等價:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer /bin/wc

用戶可以設定stream.non.zero.exit.is.failure truefalse 來表明streaming task的返回值非零時是Failure 還是Success。默認情況,streaming task返回非零時表示失敗。

將文件打包到提交的作業中

任何可執行文件都可以被指定為mapper/reducer。這些可執行文件不需要事先存放在集群上;如果在集群上還沒有,則需要用-file選項讓framework把可執行文件作為作業的一部分,一起打包提交。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py

上面的例子描述了一個用戶把可執行python文件作為mapper。其中的選項“-file myPythonScirpt.py”使可執行python文件作為作業提交的一部分被上傳到集群的機器上。

除了可執行文件外,其他mapper或reducer需要用到的輔助文件(比如字典,配置文件等)也可以用這種方式打包上傳。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py \-file myDictionary.txt

Streaming選項與用法

只使用Mapper的作業

有時只需要map函數處理輸入數據。這時只需把mapred.reduce.tasks設置為零,Map/reduce框架就不會創建reducer任務,mapper任務的輸出就是整個作業的最終輸出。

為了做到向下兼容,Hadoop Streaming也支持“-reduce None”選項,它與“-jobconf mapred.reduce.tasks=0”等價。

為作業指定其他插件

和其他普通的Map/Reduce作業一樣,用戶可以為streaming作業指定其他插件:

-inputformat JavaClassName-outputformat JavaClassName-partitioner JavaClassName-combiner JavaClassName

用于處理輸入格式的類要能返回Text類型的key/value對。如果不指定輸入格式,則默認會使用TextInputFormat。因為TextInputFormat得到的key值是LongWritable類型的(其實key值并不是輸入文件中的內容,而是value偏移量),所以key會被丟棄,只把value用管道方式發給mapper。

用戶提供的定義輸出格式的類需要能夠處理Text類型的key/value對。如果不指定輸出格式,則默認會使用TextOutputFormat類。

Hadoop Streaming中的大文件和檔案

任務使用-cacheFile和-cacheArchive選項在集群中分發文件和檔案,選項的參數是用戶已上傳至HDFS的文件或檔案的URI。這些文件和檔案在不同的作業間緩存。用戶可以通過fs.default.name.config配置參數的值得到文件所在的host和fs_port。

這個是使用-cacheFile選項的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

在上面的例子里,url中#后面的部分是建立在任務當前工作目錄下的符號鏈接的名字。這里的任務的當前工作目錄下有一個“testlink”符號鏈接,它指向testfile.txt文件在本地的拷貝。如果有多個文件,選項可以寫成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive選項用于把jar文件拷貝到任務當前工作目錄并自動把jar文件解壓縮。例如:

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

在上面的例子中,testlink3是當前工作目錄下的符號鏈接,它指向testfile.jar解壓后的目錄。

下面是使用-cacheArchive選項的另一個例子。其中,input.txt文件有兩行內容,分別是兩個文件的名字:testlink/cache.txt和testlink/cache2.txt?!皌estlink”是指向檔案目錄(jar文件解壓后的目錄)的符號鏈接,這個目錄下有“cache.txt”和“cache2.txt”兩個文件。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input "/user/me/samples/cachefile/input.txt" \-mapper "xargs cat" \-reducer "cat" \-output "/user/me/samples/cachefile/out" \ -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \ -jobconf mapred.map.tasks=1 \-jobconf mapred.reduce.tasks=1 \ -jobconf mapred.job.name="Experiment"$ ls test_jar/ cache.txt cache2.txt$ jar cvf cachedir.jar -C test_jar/ . added manifest adding: cache.txt(in = 30) (out= 29)(deflated 3%) adding: cache2.txt(in = 37) (out= 35)(deflated 5%)$ hadoop dfs -put cachedir.jar samples/cachefile$ hadoop dfs -cat /user/me/samples/cachefile/input.txt testlink/cache.txt testlink/cache2.txt$ cat test_jar/cache.txt This is just the cache string$ cat test_jar/cache2.txt This is just the second cache string$ hadoop dfs -ls /user/me/samples/cachefile/out Found 1 items /user/me/samples/cachefile/out/part-00000 <r 3> 69$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000 This is just the cache string This is just the second cache string

為作業指定附加配置參數

用戶可以使用“-jobconf <n>=<v>”增加一些配置變量。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper\-reducer /bin/wc \-jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個reducer完成作業。

關于jobconf參數的更多細節可以參考:hadoop-default.html

其他選項

Streaming 作業的其他選項如下表:

選項可選/必須描述
-cluster name可選在本地Hadoop集群與一個或多個遠程集群間切換
-dfs host:port or local可選覆蓋作業的HDFS配置
-jt host:port or local可選覆蓋作業的JobTracker配置
-additionalconfspec specfile可選用一個類似于hadoop-site.xml的XML文件保存所有配置,從而不需要用多個"-jobconf name=value"類型的選項單獨為每個配置變量賦值
-cmdenv name=value可選傳遞環境變量給streaming命令
-cacheFile fileNameURI可選指定一個上傳到HDFS的文件
-cacheArchive fileNameURI可選指定一個上傳到HDFS的jar文件,這個jar文件會被自動解壓縮到當前工作目錄下
-inputreader JavaClassName可選為了向下兼容:指定一個record reader類(而不是input format類)
-verbose可選詳細輸出

使用-cluster <name>實現“本地”Hadoop和一個或多個遠程Hadoop集群間切換。默認情況下,使用hadoop-default.xml和hadoop-site.xml;當使用-cluster <name>選項時,會使用$HADOOP_HOME/conf/hadoop-<name>.xml。

下面的選項改變temp目錄:

-jobconf dfs.data.dir=/tmp

下面的選項指定其他本地temp目錄:

-jobconf mapred.local.dir=/tmp/local-jobconf mapred.system.dir=/tmp/system-jobconf mapred.temp.dir=/tmp/temp

更多有關jobconf的細節請參考:http://wiki.apache.org/hadoop/JobConfFile

在streaming命令中設置環境變量:

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

其他例子

使用自定義的方法切分行來形成Key/Value對

之前已經提到,當Map/Reduce框架從mapper的標準輸入讀取一行時,它把這一行切分為key/value對。在默認情況下,每行第一個tab符之前的部分作為key,之后的部分作為value(不包括tab符)。

但是,用戶可以自定義,可以指定分隔符是其他字符而不是默認的tab符,或者指定在第n(n>=1)個分割符處分割而不是默認的第一個。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4

在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作為map輸出內容的分隔符,并且從在第四個“.”之前的部分作為key,之后的部分作為value(不包括這第四個“.”)。 如果一行中的“.”少于四個,則整行的內容作為key,value設為空的Text對象(就像這樣創建了一個Text:new Text(""))。

同樣,用戶可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”來指定reduce輸出的行中,第幾個分隔符處分割key和value。

一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)

Hadoop有一個工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner,它在應用程序中很有用。Map/reduce框架用這個類切分map的輸出,切分是基于key值的前綴,而不是整個key。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4 \-jobconf map.output.key.field.separator=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個變量來得到mapper的key/value對。

上面的Map/Reduce 作業中map輸出的key一般是由“.”分割成的四塊。但是因為使用了-jobconf num.key.fields.for.partition=2 選項,所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中,-jobconf map.output.key.field.separator=.指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中,key值前兩個塊值相同的所有key被分到一組,分配給一個reducer。

這種高效的方法等價于指定前兩塊作為主鍵,后兩塊作為副鍵。主鍵用于切分塊,主鍵和副鍵的組合用于排序。一個簡單的示例如下:

Map的輸出(key)

11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2

切分給3個reducer(前兩塊的值用于切分)

11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2

在每個切分后的組內排序(四個塊的值都用于排序)

11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3

Hadoop聚合功能包的使用(-reduce aggregate 選項)

Hadoop有一個工具包“Aggregate”(https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)?!癆ggregate”提供一個特殊的reducer類和一個特殊的combiner類,并且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個mapper插件類,這個類用于為mapper輸入的每個key/value對產生“可聚合項”。combiner/reducer利用適當的聚合器聚合這些可聚合項。

要使用Aggregate,只需指定“-reducer aggregate”:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myAggregatorForKeyCount.py \-reducer aggregate \-file myAggregatorForKeyCount.py \-jobconf mapred.reduce.tasks=12

python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/pythonimport sys;def generateLongCountToken(id):return "LongValueSum:" + id + "\t" + "1"def main(argv):line = sys.stdin.readline();try:while line:line = line[:-1];fields = line.split("\t");print generateLongCountToken(fields[0]);line = sys.stdin.readline();except "end of file":return None if __name__ == "__main__":main(sys.argv)

字段的選取(類似于unix中的 'cut' 命令)

Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助用戶高效處理文本數據,就像unix中的“cut”工具。工具類中的map函數把輸入的key/value對看作字段的列表。用戶可以指定字段的分隔符(默認是tab),可以選擇字段列表中任意一段(由列表中一個或多個字段組成)作為map輸出的key或者value。同樣,工具類中的reduce函數也把輸入的key/value對看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf map.output.key.field.separa=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.data.field.separator=. \-jobconf map.output.key.value.fields.spec=6,5,1-3:0- \-jobconf reduce.output.key.value.fields.spec=0-2:5- \-jobconf mapred.reduce.tasks=12

選項“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何為map的輸出選取key和value。Key選取規則和value選取規則由“:”分割。在這個例子中,map輸出的key由字段6,5,1,2和3組成。輸出的value由所有字段組成(“0-”指字段0以及之后所有字段)。

選項“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(譯者注:此處應為”0-2:5-“)指定如何為reduce的輸出選取value。本例中,reduce的輸出的key將包含字段0,1,2(對應于原始的字段6,5,1)。reduce輸出的value將包含起自字段5的所有字段(對應于所有的原始字段)。

常見問題

我該怎樣使用Hadoop Streaming運行一組獨立(相關)的任務呢?

多數情況下,你不需要Map Reduce的全部功能,而只需要運行同一程序的多個實例,或者使用不同數據,或者在相同數據上使用不同的參數。你可以通過Hadoop Streaming來實現。

如何處理多個文件,其中每個文件一個map?

例如這樣一個問題,在集群上壓縮(zipping)一些文件,你可以使用以下幾種方法:

  • 使用Hadoop Streaming和用戶編寫的mapper腳本程序:
    • 生成一個文件,文件中包含所有要壓縮的文件在HDFS上的完整路徑。每個map 任務獲得一個路徑名作為輸入。
    • 創建一個mapper腳本程序,實現如下功能:獲得文件名,把該文件拷貝到本地,壓縮該文件并把它發到期望的輸出目錄。
  • 使用現有的Hadoop框架:
    • 在main函數中添加如下命令: FileOutputFormat.setCompressOutput(conf, true);FileOutputFormat.setOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class);conf.setOutputFormat(NonSplitableTextInputFormat.class);conf.setNumReduceTasks(0);
    • 編寫map函數: public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {output.collect((Text)value, null);}
    • 注意輸出的文件名和原文件名不同
  • 應該使用多少個reducer?

    請參考Hadoop Wiki:Reducer

    如果在Shell腳本里設置一個別名,并放在-mapper之后,Streaming會正常運行嗎?例如,alias cl='cut -fl',-mapper "cl"會運行正常嗎?

    腳本里無法使用別名,但是允許變量替換,例如:

    $ hadoop dfs -cat samples/student_marks alice 50 bruce 70 charlie 80 dan 75$ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input /user/me/samples/student_marks -mapper \"$c2\" -reducer 'cat' -output /user/me/samples/student_out -jobconf mapred.job.name='Experiment'$ hadoop dfs -ls samples/student_out Found 1 items/user/me/samples/student_out/part-00000 <r 3> 16$ hadoop dfs -cat samples/student_out/part-00000 50 70 75 80

    我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?

    現在不支持,而且會給出錯誤信息“java.io.IOException: Broken pipe”。這或許是一個bug,需要進一步研究。

    在streaming作業中用-file選項運行一個分布式的超大可執行文件(例如,3.6G)時,我得到了一個錯誤信息“No space left on device”。如何解決?

    配置變量stream.tmpdir指定了一個目錄,在這個目錄下要進行打jar包的操作。stream.tmpdir的默認值是/tmp,你需要將這個值設置為一個有更大空間的目錄:

    -jobconf stream.tmpdir=/export/bigspace/...

    如何設置多個輸入目錄?

    可以使用多個-input選項設置多個輸入目錄:

    hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2'

    如何生成gzip格式的輸出文件?

    除了純文本格式的輸出,你還可以生成gzip文件格式的輸出,你只需設置streaming作業中的選項‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’。

    Streaming中如何自定義input/output format?

    至少在Hadoop 0.14版本以前,不支持多個jar文件。所以當指定自定義的類時,你要把他們和原有的streaming jar打包在一起,并用這個自定義的jar包替換默認的hadoop streaming jar包。

    Streaming如何解析XML文檔?

    你可以使用StreamXmlRecordReader來解析XML文檔。

    hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

    Map任務會把BEGIN_STRING和END_STRING之間的部分看作一條記錄。

    在streaming應用程序中如何更新計數器?

    streaming進程能夠使用stderr發出計數器信息。reporter:counter:<group>,<counter>,<amount>應該被發送到stderr來更新計數器。

    如何更新streaming應用程序的狀態?

    streaming進程能夠使用stderr發出狀態信息。reporter:status:<message> 要被發送到stderr來設置狀態。


    出處:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

    總結

    以上是生活随笔為你收集整理的Hadoop Streaming的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。