python hadoop streaming_Hadoop Streaming 使用及参数设置
1. MapReduce 與 HDFS 簡(jiǎn)介
什么是 Hadoop ?
Google 為自己的業(yè)務(wù)需要提出了編程模型 MapReduce 和分布式文件系統(tǒng) Google File System,并發(fā)布了相關(guān)論文(可在 Google Research 的網(wǎng)站上獲得:GFS、MapReduce)。Doug Cutting 和 Mike Cafarella 在開(kāi)發(fā)搜索引擎 Nutch 時(shí)對(duì)這兩篇論文進(jìn)行了自己的實(shí)現(xiàn),即同名的 MapReduce 和 HDFS,合起來(lái)就是 Hadoop。
MapReduce 的 Data Flow 如下圖所示,原始數(shù)據(jù)經(jīng)過(guò) mapper 處理,再進(jìn)行 partition 和 sort,到達(dá) reducer,輸出最后結(jié)果。
2. Hadoop Streaming 原理
Hadoop 本身是用 Java 開(kāi)發(fā)的,程序也需要用 Java 編寫(xiě),但是通過(guò) Hadoop Streaming,我們可以使用任意語(yǔ)言來(lái)編寫(xiě)程序,讓 Hadoop 運(yùn)行。
Hadoop Streaming 就是通過(guò)將其他語(yǔ)言編寫(xiě)的 mapper 和 reducer 通過(guò)參數(shù)傳給一個(gè)事先寫(xiě)好的 Java 程序(Hadoop 自帶的 *-streaming.jar),這個(gè) Java 程序會(huì)負(fù)責(zé)創(chuàng)建 MR 作業(yè),另開(kāi)一個(gè)進(jìn)程來(lái)運(yùn)行 mapper,將得到的輸入通過(guò) stdin 傳給它,再將 mapper 處理后輸出到 stdout 的數(shù)據(jù)交給 Hadoop,經(jīng)過(guò) partition 和 sort 之后,再另開(kāi)進(jìn)程運(yùn)行 reducer,同樣通過(guò) stdin/stdout 得到最終結(jié)果。因此,我們只需要在其他語(yǔ)言編寫(xiě)的程序中,通過(guò) stdin 接收數(shù)據(jù),再將處理過(guò)的數(shù)據(jù)輸出到 stdout,Hadoop Streaming 就能通過(guò)這個(gè) Java 的 wrapper 幫我們解決中間繁瑣的步驟,運(yùn)行分布式程序。
原理上只要是能夠處理 stdio 的語(yǔ)言都能用來(lái)寫(xiě) mapper 和 reducer,也可以指定 mapper 或 reducer 為 Linux 下的程序(如 awk、grep、cat)或者按照一定格式寫(xiě)好的 java class。因此,mapper 和 reducer 也不必是同一類的程序。
1. Hadoop Streaming 的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
1. 可以使用自己喜歡的語(yǔ)言來(lái)編寫(xiě) MapReduce 程序(不必非得使用 Java)
2. 不需要像寫(xiě) Java 的 MR 程序那樣 import 一大堆褲,在代碼里做很多配置,很多東西都抽象到了 stdio 上,代碼量顯著減少。
3. 因?yàn)闆](méi)有庫(kù)的依賴,調(diào)試方便,并且可以脫離 Hadoop 先在本地用管道模擬調(diào)試。
缺點(diǎn):
1. 只能通過(guò)命令行參數(shù)來(lái)控制 MapReduce 框架,不像 Java 的程序那樣可以在代碼里使用 API,控制力比較弱。
2. 因?yàn)橹虚g隔著一層處理,效率會(huì)比較慢。
3. 所以 Hadoop Streaming 比較適合做一些簡(jiǎn)單的任務(wù),比如用 Python 寫(xiě)只有一兩百行的腳本。如果項(xiàng)目比較復(fù)雜,或者需要進(jìn)行比較細(xì)致的優(yōu)化,使用 Streaming 就容易出現(xiàn)一些束手束腳的地方。
2. 用 Python 編寫(xiě)簡(jiǎn)單的 Hadoop Streaming 程序
使用 Python 編寫(xiě) Hadoop Streaming 程序有幾點(diǎn)需要注意:
1. 在能使用 iterator 的情況下,盡量使用 iterator,避免將 stdin 的輸入大量?jī)?chǔ)存在內(nèi)存里,否則會(huì)嚴(yán)重降低性能。
2. Streaming 不會(huì)幫你分割 key 和 value 傳進(jìn)來(lái),傳進(jìn)來(lái)的只是一個(gè)個(gè)字符串而已,需要你自己在代碼里手動(dòng)調(diào)用 split()。
3. 從 stdin 得到的每一行數(shù)據(jù)末尾似乎會(huì)有 '\n' ,保險(xiǎn)起見(jiàn)一般都需要用 rstrip() 來(lái)去掉。
4. 在想獲得 key-value list 而不是一個(gè)個(gè)處理 key-value pair 時(shí),可以使用 groupby 配合 itemgetter 將 key 相同的 key-value pair 組成一個(gè)個(gè) group,得到類似 Java 編寫(xiě)的 reduce 可以直接獲取一個(gè) Text 類型的 key 和一個(gè) iterable 作為 value 的效果。注意 itemgetter 的效率比 lambda 表達(dá)式的效率要高,所以用 itemgetter 比較好。
編寫(xiě) Hadoop Streaming 程序的基本模版:
#!/usr/bin/env python#-*- coding: utf-8 -*-
"""Some description here..."""
importsysfrom operator importitemgetterfrom itertools importgroupbydefread_input(file):"""Read input and split."""
for line infile:yield line.rstrip().split('\t')defmain():
data=read_input(sys.stdin)for key, kviter ingroupby(data, itemgetter(0)):#some code here..
if __name__ == "__main__":
main()
如果對(duì)輸入輸出格式有不同于默認(rèn)的控制,主要會(huì)在 read_input() 里調(diào)整。
3. 本地調(diào)試
本地調(diào)試用于 Hadoop Streaming 的 Python 程序的基本模式是:
$ cat | python | sort -t $'\t' -k1,1 | python >
這里有幾點(diǎn)需要注意:
1. Hadoop 默認(rèn)按照 tab 來(lái)分割 key 和 value,以第一個(gè)分割出的部分為 key,按 key 進(jìn)行排序,因此這里使用 sort -t $'\t' -k1,1 來(lái)模擬。如果有其他需求,在交給 Hadoop Streaming 執(zhí)行時(shí)可以通過(guò)命令行參數(shù)設(shè)置,本地調(diào)試也可以進(jìn)行相應(yīng)的調(diào)整,主要是調(diào)整 sort 的參數(shù)。
2. 如果在 Python 腳本里加上了 shebang,并且為它們添加了執(zhí)行權(quán)限,也可以用類似于 ./mapper.py (會(huì)根據(jù) shebang 自動(dòng)調(diào)用指定的解釋器來(lái)執(zhí)行文件)來(lái)代替 python mapper.py。
4. 在集群上運(yùn)行與監(jiān)控
1. 察看文檔
首先需要知道用于 Streaming 的 Java 程序在哪里。在 1.0.x 的版本中,應(yīng)該都在 $HADOOP_HOME/contrib/streaming/ 下:
$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar
通過(guò)執(zhí)行 Hadoop 命令
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info
就會(huì)看到一系列 Streaming 自帶的幫助,帶有各種參數(shù)的說(shuō)明和使用樣例。
5. 運(yùn)行命令
用 Hadoop Streaming 執(zhí)行 Python 程序的一般步驟是:
1. 將輸入文件放到 HDFS 上,建議使用 copyFromLocal 而不是 put 命令。參見(jiàn)Difference between hadoop fs -put and hadoop fs -copyFromLocal
1. 一般可以新建一個(gè)文件夾用于存放輸入文件,假設(shè)叫 input
$ hadoop fs -mkdir input
然后用
$ hadoop fs -ls
查看目錄,可以看到出現(xiàn)了一個(gè) /user/hadoop/input 文件夾。/user/hadoop 是默認(rèn)的用戶文件夾,相當(dāng)于本地文件系統(tǒng)中的 /home/hadoop。
2. 再使用
$ hadoop fs -copyFromLocal input/
將本地文件放到 input 文件夾下。
2. 開(kāi)始 MapReduce 作業(yè),假設(shè)你現(xiàn)在正在放有 mapper 和 reducer 兩個(gè)腳本的目錄下,而且它們剛好就叫 mapper.py 和 reducer.py,在不需要做其他配置的情況下,執(zhí)行
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \-mapper mapper.py \-filemapper.py \-reducer reducer.py \-filereducer.py \-input input/*\
-output output
第一行是告訴 Hadoop 運(yùn)行 Streaming 的 Java 程序,接下來(lái)的是參數(shù):
這里的 mapper.py 和 reducer.py 是 mapper 所對(duì)應(yīng) python 程序的路徑。為了讓 Hadoop 將程序分發(fā)給其他機(jī)器,需要再加一個(gè) -file 參數(shù)用于指明要分發(fā)的程序放在哪里。
注意這樣寫(xiě)的前提是這個(gè) Python 程序里有 Shebang 而且添加了執(zhí)行權(quán)限。如果沒(méi)有的話可以改成
-mapper 'python mapper.py'
加上解釋器命令,用引號(hào)擴(kuò)住(注意在參數(shù)中傳入解釋器命令,不再是用`符擴(kuò)住,而是'符)。準(zhǔn)確來(lái)說(shuō),mapper 后面跟的騎士應(yīng)該是一個(gè)命令而不是文件名。
假如你執(zhí)行的程序不放在當(dāng)前目錄下,比如說(shuō)在當(dāng)前目錄的 src 文件夾下,可以這樣寫(xiě)
-mapper 'python mapper.py' -file src/mapper.py \-reducer 'python reducer.py' -file src/reducer.py \
也就是說(shuō),-mapper 和 -reducer 后面跟的文件名不需要帶上路徑,而 -file 后的參數(shù)需要。注意如果你在 mapper 后的命令用了引號(hào),加上路徑名反而會(huì)報(bào)錯(cuò)說(shuō)找不到這個(gè)程序。(因?yàn)?-file 選項(xiàng)會(huì)將對(duì)應(yīng)的本地參數(shù)文件上傳至 Hadoop Streaming 的工作路徑下,所以再執(zhí)行 -mapper 對(duì)應(yīng)的參數(shù)命令能直接找到對(duì)應(yīng)的文件。
-input 和 -output 后面跟的是 HDFS 上的路徑名,這里的 input/* 指的是"input 文件夾下的所有文件",注意 -output 后面跟著的需要是一個(gè)不存在于 HDFS 上的路徑,在產(chǎn)生輸出的時(shí)候 Hadoop 會(huì)幫你創(chuàng)建這個(gè)文件夾,如果已經(jīng)存在的話就會(huì)產(chǎn)生沖突。(因此每次執(zhí)行 Hadoop Streaming 前可以通過(guò)腳本命令 hadoop fs -rmr 清除輸出路徑)。
有時(shí)候 Shebang 不一定能用,尤其是在執(zhí)行環(huán)境比較復(fù)雜的時(shí)候,最保險(xiǎn)的做法是:
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \-mapper 'python mapper.py' -filemapper.py \-reducer 'python reducer.py' -filereducer.py \-input input/*-output output
這樣寫(xiě)還有一個(gè)好處,就是可以在引號(hào)里寫(xiě)上提供給 python 程序的命令行參數(shù),甚至做目錄的變更以及環(huán)境變量的初始化等一系列 shell 命令。
由于 mapper 和 reducer 參數(shù)跟的實(shí)際上是命令,所以如果每臺(tái)機(jī)器上 python 的環(huán)境配置不一樣的話,會(huì)用每臺(tái)機(jī)器自己的配置去執(zhí)行 python 程序。
6. 得到結(jié)果
成功執(zhí)行完這個(gè)任務(wù)之后,使用 output 參數(shù)在 HDFS 上指定的輸出文件夾里就會(huì)多出幾個(gè)文件:一個(gè)空白文件 _SUCCESS,表面 job 運(yùn)行成功,這個(gè)文件可以讓其他程序只要查看一下 HDFS 就能判斷這次 job 是否運(yùn)行成功,從而進(jìn)行相關(guān)處理。
一個(gè) _logs 文件夾,裝著任務(wù)日志。
part-00000,.....,part-xxxxx 文件,有多少個(gè) reducer 后面的數(shù)字就會(huì)有多大,對(duì)應(yīng)每個(gè) reducer 的輸出結(jié)果。
假如你的輸出很少,比如是一個(gè)只有幾行的計(jì)數(shù),你可以用
$ hadoop fs -cat
直接將輸出打印到終端查看。
假如你的輸出很多,則需要拷貝到本地文件系統(tǒng)來(lái)查看。可以使用 copyToLocal 來(lái)獲取整個(gè)文件夾。如果你不需要 _SUCCESS 和 _logs,并且想要將所有 reducer 的輸出合并,可以使用 getmerge 命令。
$ hadoop fs -getmerge output ./
上述命令將 output 下的 part-xxxxx 合并,放到當(dāng)前目錄的一個(gè)叫 output 的文件里。
7. 如何串聯(lián)多趟 MapReduce
如果有多次任務(wù)要執(zhí)行,下一步需要用上一步的任務(wù)做輸入,解決辦法很簡(jiǎn)單。假設(shè)上一步在 HDFS 的輸出文件夾是 output1,那么在下一步的運(yùn)行命令中,指明
-input output1/part-*
即指定上一次的所有輸出為本次任務(wù)的輸入即可。
8. 使用額外的文件
假如 MapReduce 的 job 除了輸入以外還需要一些額外的文件,有兩種選擇:
1. 大文件
所謂的大文件就是大小大于設(shè)置的 local.cache.size 的文件,默認(rèn)是10GB。這個(gè)時(shí)候可以用 -file 來(lái)分發(fā)。除此之外代碼本身也可以用 file 來(lái)分發(fā)。
格式:假如我要加多一個(gè) sideData.txt 給 python 腳本使用:
$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input inputDir \-output outputDir \-mapper mapper.py \-filemapper.py \-reducer reducer.py \-filereducer.py \-file sideData.txt
這樣 -file 選項(xiàng)的參數(shù)文件都會(huì)被上傳至 MapReduce 的工作目錄下,所以 mapper 和 reducer 代碼都可以通過(guò)文件名直接訪問(wèn)到文件。在 python 腳本中,只要把這個(gè)文件當(dāng)成自己同一目錄下的本地文件來(lái)打開(kāi)就可以了。比如:
f = open('sideData.txt')
注意這個(gè) file 是只讀的,不可以寫(xiě)。
2. 小文件
如果是比較小的文件,想要提高讀寫(xiě)速度可以將它放在 distributed cache 里(也就是每臺(tái)機(jī)器都有自己的一份 copy,不需要網(wǎng)絡(luò) IO 就可以拿到數(shù)據(jù))。這里要用到的參數(shù)是 -cachefile,寫(xiě)法和用法與上一個(gè)一樣,就是將 -file 改成 -cachefile 而已。
3. 如果上傳目錄或者多個(gè)目錄時(shí)使用 -files 選項(xiàng)
-files dir1,dir2 #多個(gè)目錄用','隔開(kāi),且不能有空格
上傳目錄后,可以直接訪問(wèn)當(dāng)前目錄
4. 上傳 HDFS 上的文件或者目錄
只能 -files 命令上傳 HDFS 路徑下的文件或目錄,然后就可以像訪問(wèn)本地文件一樣訪問(wèn) HDFS 文件。
比如:
hdfs_file="hdfs://webboss-10-166-133-95:9100/user/hive/conf/part-00000"input=/user/hive/input/*output=/user/hive/output
mapper_script=mapper.py
reducer_script=reducer.py
map_file=./mapper.py
reduce_file=./reducer.py
hadoop fs -rmr $output
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \
-D mapred.reduce.tasks=0 \
-files $hdfs_file \
-input $input \
-output $output \
-mapper $mapper_script \
-file $map_file \
-reducer $reducer_script \
-file $reduce_file
9. 控制 partitioner
partitioning 指的是數(shù)據(jù)經(jīng)過(guò) mapper 處理后,被分發(fā)到 reducer 上的過(guò)程。partitioner 控制的,就是“怎樣的 mapper 輸出會(huì)被分發(fā)到哪一個(gè) reducer 上”。
Hadoop 有幾個(gè)自帶的 partitoner,解釋可以看這里。默認(rèn)的是 HashPartitioner,也就是把第一個(gè) '\t' 前的 key 做 hash 之后用于分配 partition。寫(xiě) Hadoop Streaming 程序是可以選擇其他 partitioner 的,你可以選擇自帶的其他幾種里的一種,也可以自己寫(xiě)一個(gè)繼承 Partitioner 的 java 類然后編譯成 jar,在運(yùn)行參數(shù)里指定為你用的 partitioner。
官方自帶的 partionner 里最常用的是 KeyFieldBasedPartitioner。它會(huì)按照 key 的一部分來(lái)做 partition,而不是用整個(gè) key 來(lái)做 partition。
在學(xué)會(huì)用 KeyFieldBasedPartitioner 之前,必然要先學(xué)怎么控制 key-value 的分割。分割 key 的步驟可以分成兩步,用 python 來(lái)描述一下大約是
fields =output.split(separator)
key= fields[:numKeyfields]
1. 選擇用什么符號(hào)來(lái)分割 key,也就是選擇 separator
map.output.key.field.separator 可以指定用于分割 key 的符號(hào)。比如指定為一點(diǎn)的話,就要加上參數(shù)。
-D stream.map.output.field.separator=.
假設(shè)你的 mapper 輸出是
11.22.33.44
這時(shí)會(huì)用 '.' 進(jìn)行分割,看準(zhǔn) [11, 22, 33, 44] 這里的其中一個(gè)或幾個(gè)作為 key。
2. 選擇 key 的范圍,也就是選擇 numKeyfields
控制 key 的范圍的參數(shù)是這個(gè),假設(shè)要設(shè)置被分割出的前 2 個(gè)元素為 key:
-D stream.num.map.output.key.fields=2
那么 key 就是上面的 1122。值得注意的是假如這個(gè)數(shù)字設(shè)置到覆蓋整個(gè)輸出,在這個(gè)例子里是4的話,那么整一行都會(huì)變成 key。
上面分割出 key 之后,KeyFieldBasedPartitioner 還需要知道你想要用 key 里的哪部分作為 partition 的依據(jù)。它進(jìn)行配置的過(guò)程可以看源代碼來(lái)理解。
假設(shè)在上一步我們通過(guò)使用
-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4
將 11.22.33.44 的整個(gè)字符串都設(shè)置成了 key,下一步就是在這個(gè) key 的內(nèi)部再進(jìn)行一次分割。map.output.key.field.separator 可以用來(lái)設(shè)置第二次分割用的分割符,mapred.text.key.partitioner.options 可以接受參數(shù)來(lái)劃分被分割出來(lái)的 partition key,比如:
-D map.output.key.field.separator=. \-D mapred.text.key.partitioner.options=-k1,2
指的就是在 key 的內(nèi)部里,將第1到第2個(gè)被點(diǎn)分割的元素作為 partition key,這個(gè)例子里也就是 1122。這里的值 -ki,j 表示從 i 到 j 個(gè)元素(inclusive)會(huì)作為 partition key。如果終點(diǎn)省略不寫(xiě),像 -ki 的話,那么 i 和 i 之后的元素都會(huì)作為 partition key。
partition key 相同的輸出會(huì)保證分到同一個(gè) reducer 上,也就是所有 11.22.xx.xx 的輸出都會(huì)到同一個(gè) partitioner,11.22 換成其他各種組合也是一樣。
實(shí)例說(shuō)明一下,就是這樣的:
1. mapper 的輸出是
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
2. 指定前 4 個(gè)元素做 key,key 里的前兩個(gè)元素做 partition key,分成 3 個(gè) partition 的話,就會(huì)被分成
11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2
3. 下一步 reducer 會(huì)對(duì)自己得到的每個(gè) partition 內(nèi)進(jìn)行排序,結(jié)果就是
11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3
Streaming 命令格式如下:
$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4\-D map.output.key.field.separator=4\-D mapred.text.key.partitioner.options=-k1,2\-input inputDir \-output outputDir \-mapper mapper.py -file mapper.py \-reducer reducer.py -file reducer.py \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
注意:
Hadoop 執(zhí)行命令時(shí)的選項(xiàng)是有順序的,順序是 bin/hadoop command [genericOptions] [commandOptions].
對(duì)于 Streaming,-D 屬于 genericOptions,即 hadoop 的通用選項(xiàng),所以必須寫(xiě)在前面。
Streaming 的所有選項(xiàng)可參考:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -info
3. 控制 comparator 與自定義排序
上面說(shuō)到 mapper 的輸出被 partition 到各個(gè) reducer 之后,會(huì)有一步排序。這個(gè)排序的標(biāo)準(zhǔn)也是可以通過(guò)設(shè)置 comparator 控制的。和上面一樣,要先設(shè)置分割出 key 用的分割符、key 的范圍,key 內(nèi)部分隔用的分割符
-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4\-D map.output.key.field.separator=.
這里要控制的就是 key 內(nèi)部的哪些元素用來(lái)做排序依據(jù),是排字典序還是數(shù)字序,倒敘還是正序。用來(lái)控制的參數(shù)是 mapred.text.key.comparator.options,接受的值格式類似于 unix sort。比如我要按第二個(gè)元素的數(shù)字序(默認(rèn)字典序)+倒序來(lái)排元素的話,就用 -D mapred.text.key.comparator.options=-k2,2nr
n表示數(shù)字序,r表示倒序。這樣一來(lái)
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
就會(huì)被排成
11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1
總結(jié)
以上是生活随笔為你收集整理的python hadoop streaming_Hadoop Streaming 使用及参数设置的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 安卓百分百破解网(安卓百分百)
- 下一篇: python gil锁存在的意义_关于p