日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce

發布時間:2025/4/16 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
場景:將Python程序通過hadoop-streaming提交到Hadoop集群執行。
參考:http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/


1、Python編寫Mapper
? ?業務邏輯是從會從標準輸入(stdin)讀取數據,默認以空格分割單詞,然后按行輸出單詞機器出現頻率到標準輸出(stdout),不過整個Map處理過程并不會統計每個單詞出現的總次數,而是直接輸出“word,1”,以便作為Reduce的輸入進行統計。

? ?代碼如下:

#coding:utf-8''' Created on 2017年6月7日 @author: fjs '''#!/usr/bin/env python import sys# input comes from STDIN (standard input) for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# split the line into wordswords = line.split()# increase countersfor word in words:# write the results to STDOUT (standard output);# what we output here will be the input for the# Reduce step, i.e. the input for reducer.py## tab-delimited; the trivial word count is 1print '%s\t%s' % (word, 1)2、Python編寫Reducer
? ?Reduce代碼,它會從標準輸入(stdin)讀取mapper.py的結果,然后統計每個單詞出現的總次數并輸出到標準輸出(stdout)。
? ?代碼如下:

#coding:utf-8''' Created on 2017年6月7日 @author: fjs '''#!/usr/bin/env pythonfrom operator import itemgetter import syscurrent_word = None current_count = 0 word = None# input comes from STDIN for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# parse the input we got from mapper.pyword, count = line.split('\t', 1)# convert count (currently a string) to inttry:count = int(count)except ValueError:# count was not a number, so silently# ignore/discard this linecontinue# this IF-switch only works because Hadoop sorts map output# by key (here: word) before it is passed to the reducerif current_word == word:current_count += countelse:if current_word:# write result to STDOUTprint '%s\t%s' % (current_word, current_count)current_count = countcurrent_word = word# do not forget to output the last word if needed! if current_word == word:print '%s\t%s' % (current_word, current_count)
3、文件準備
? ?1)將python程序文件上傳到Hadoop集群客戶機,為文件賦予執行權限
? ?#chmod +x /data/etlcj/python/mapper.py
? ?#chmod +x /data/etlcj/python/reducer.py
? ?2)上傳測試文件到集群
? ?#vi /data/etlcj/python/wcin.txt ? 加入:

foo foo quux labs foo bar quux abc bar see you by test welcome test abc labs foo me python hadoop ab ac bc bec python上傳到集群
? ?#hadoop fs -put /data/etlcj/python/wcin.txt ?/apps/etlcj/python/


4、基于hadoop-streaming執行MapReduce任務:

? ? ?執行語句:

#hadoop jar /usr/hdp/2.5.3.0-37/hadoop-mapreduce/hadoop-streaming-2.7.3.2.5.3.0-37.jar -files '/data/etlcj/python/mapper.py,/data/etlcj/python/reducer.py' -input /apps/etlcj/python/wcin.txt -output /apps/etlcj/python/out/ -mapper ./mapper.py -reducer ./reducer.py?執行過程中提示:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 126at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:415)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
懷疑是py腳本代碼問題或版本環境不匹配問題,對python語法不熟悉,暫無法深入,但python提交到hadoop集群的方法可以。?


5、hadoop-streaming參數參考:
? Usage:hadoop jar $Haoop_Home$/hadoop-streaming-*.jar?
? ?-input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
? ?-inputformat <輸入格式 JavaClassName>?
? ?-output <輸出目錄>?
? ?-outputformat <輸出格式 JavaClassName>?
? ?-mapper <mapper executable or JavaClassName>?
? ?-reducer <reducer executable or JavaClassName>?
? ?-combiner <combiner executable or JavaClassName>?
? ?-partitioner <JavaClassName> \
? ?-cmdenv <name=value> \ # 可以傳遞環境變量,可以當作參數傳入到任務中,可以配置多個
? ?-file <依賴的文件> \ # 配置文件,字典等依賴
? ?-D <name=value> \ # 作業的屬性配置


總結

以上是生活随笔為你收集整理的【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。