日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop Streaming高级编程

發布時間:2025/3/21 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop Streaming高级编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1. 概要

本文主要介紹了Hadoop Streaming的一些高級編程技巧,包括,怎樣在mapredue作業中定制輸出輸出格式?怎樣向mapreduce作業中傳遞參數?怎么在mapreduce作業中加載詞典?怎樣利用Hadoop Streamng處理二進制格式的數據等。

關于Hadoop Streaming的基本編程方法,可參考:Hadoop Streaming編程,Hadoop編程實例。

2. 在mapreduce作業中定制輸入輸出格式

Hadoop 0.21.0之前的版本中的Hadoop Streaming工具只支持文本格式的數據,而從Hadoop 0.21.0開始,也支持二進制格式的數據。這里介紹文本文件的輸入輸出格式定制,關于二進制數據的格式,可參考第5節。

Hadoop Streaming提交作業的格式為:

1 2 3 Usage: $HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/hadoop-streaming.jar [options]

其中,-D選項中的一些配置屬性可定義輸入輸出格式,具體如下(注意,對于文本而言,每一行中存在一個key/value對,這里只能定制key和value之間的分割符,而行與行之間的分隔符不可定制,只能是\n):

(1)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數據的分隔符,默認均為\t。

(2)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目,如

每一行形式為,Key1\tkey2\tkey3\tvalue,采用默認的分隔符,且stream.num.map.output.key.fields設為2,則Key1\tkey2表示key,key3\tvalue表示value。

(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。

(4)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目

3. 向mapreduce作業傳遞參數

提交作業時,使用-cmdenv選項以環境變量的形式將你的參數傳遞給mapper/reducer,如:

1 2 3 4 5 6 7 8 9 10 11 $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -input input \ -ouput output \ -cmdenv grade=1 \ …….

然后編寫mapper或reducer時,使用main函數的第三個參數捕獲你傳入的環境變量,如:

1 2 3 4 5 6 7 8 9 10 11 12 13 int main(int argc, char *argv[], char *env[]){ int i, grade; for (i = 0; env[i] != NULL; i++) if(strncmp(env[i], “grade=”, 6) == 0) grade=atoi(env[i]+6); …… }

4. 在mapreduce作業中加載詞典

提交作業時,使用-file選項,如:

1 2 3 4 5 6 7 8 9 10 11 $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -input input \ -ouput output \ -file dict.txt \ …….

然后編寫mapper或reducer時,像本地文件一樣打開并使用dic.txt文件,如:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int main(int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen("dict.txt","r"); if (!fp) return 1; while (fgets(buffer, 1024, fp)!=NULL) { …… } …… }

如果要加載非常大的詞典或配置文件,Hadoop Streaming還提供了另外一個選項-files,該選項后面跟的是HDFS上的一個文件(將你的配置文件放到HDFS上,再大也可以!!!),你可以在程序中像打開本地文件一樣打開該文件,此外,你也可以使用#符號在本地建一個系統鏈接,如:

1 2 3 4 5 6 7 $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -file? hdfs://host:fs_port/user/dict.txt#dict_link \ …….

在代碼中這樣做:

如:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int main(int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen("dict_link ","r"); //or fp = fopen("dict.txt ","r"); if (!fp) return 1; while (fgets(buffer, 1024, fp)!=NULL) { …… } …… }

5. 處理二進制格式的數據

從Hadoop 0.21.0開始,streaming支持二進制文件(具體可參考:HADOOP-1722),用戶提交作業時,使用-io選項指明二進制文件格式。0.21.0版本中增加了兩種二進制文件格式,分別為:

(1) rawbytes:key和value均用【4個字節的長度+原始字節】表示

(2) typedbytes:key和value均用【1字節類型+4字節長度+原始字節】表示

用戶提交作業時,如果用-io指定二進制格式為typedbytes,則map的輸入輸出,reduce的輸入輸出均為typedbytes,如果想細粒度的控制這幾個輸入輸出,可采用以下幾個選項:

1 2 3 4 5 6 7 -D stream.map.input=[identifier] -D stream.map.output=[identifier] -D stream.reduce.input=[identifier] -D stream.reduce.output=[identifier]

你如果采用的python語言,下面是從?HADOOP-1722?中得到的一個例子(里面用到了解析typedbytes的python庫,見:http://github.com/klbostee/typedbytes ):

mapper腳本如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 import sys import typedbytes input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, value) in input: for word in value.split(): output.write((word, 1))

reducer腳本:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import sys import typedbytes from itertools import groupby from operator import itemgetter input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, group) in groupby(input, itemgetter(0)): values = map(itemgetter(1), group) output.write((key, sum(values)))

6. 自定義counter并增加counter的值

用戶采用某種語言編寫的mapper或者reducer可采用標準錯誤輸出(stderr)自定義和改變counter值,格式為:reporter:counter:<group>,<counter>,<amount>,如,在C語言編寫的mapper/reducer中:

1 fprintf(stderr, “reporter:counter:group,counter1,1”); //將組group中的counter1增加1

注:用戶定義的自定義counter的最終結果會在桌面或者web界面上顯示出來。

如果你想在mapreduce作業執行過程中,打印一些狀態信息,同樣可使用標準錯誤輸出,格式為:reporter:status:<message>,如,在C語言編寫的mapper/reducer中:

1 fprintf(stderr, “reporter:status:mapreduce job is started…..”); //在shell桌面上打印“mapreduce job is started…..”

7. 在mapreduce使用Linux Pipes

迄今為止(0.21.0版本之前,包括0.21.0),Hadoop Streaming是不支持Linux Pipes,如:-mapper “cut -f1 | sed s/foo/bar/g”會報”java.io.IOException: Broken pipe”錯誤。

8. 在mapreduce中獲取JobConf的屬性值

在0.21.0版本中,streaming作業執行過程中,JobConf中以mapreduce開頭的屬性(如mapreduce.job.id)會傳遞給mapper和reducer,關于這些參數,可參考:http://hadoop.apache.org/mapreduce/docs/r0.21.0/mapred_tutorial.html#Configured+Parameters

其中,屬性名字中的“.”會變成“_”,如mapreduce.job.id會變為mapreduce_job_id,用戶可在mapper/reducer中獲取這些屬性值直接使用(可能是傳遞給環境變量參數,即main函數的第三個參數,本文作業還未進行驗證)。

9. 一些Hadoop Streaming的開源軟件包

(1) 針對Hadoop Streaming常用操作的C++封裝包(如自定義和更新counter,輸出狀態信息等):https://github.com/dgleich/hadoopcxx

(2) C++實現的typedbytes代碼庫:https://github.com/dgleich/libtypedbytes

(3) python實現的typedbytes代碼庫:?http://github.com/klbostee/typedbytes

(4) Java實現的typedbytes代碼庫(Hadoop 0.21.0代碼中自帶)

10. 總結

Hadoop Streaming使得程序員采用各種語言編寫mapreduce程序變得可能,它具備程序員所需的大部分功能接口,同時由于這種方法編寫mapreduce作業簡單快速,越來越多的程序員開始嘗試使用Hadoop Steraming。

11. 參考資料

http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html

https://issues.apache.org/jira/browse/HADOOP-1722

原創文章,轉載請注明:?轉載自董的博客

本文鏈接地址:?http://dongxicheng.org/mapreduce/hadoop-streaming-advanced-programming/

總結

以上是生活随笔為你收集整理的Hadoop Streaming高级编程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。