日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

使用Hadoop Streaming 完成MapReduce(Python代码)

發布時間:2023/12/15 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Hadoop Streaming 完成MapReduce(Python代码) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 Map和Reduce?

?

首先看下MR的工作原理

?

MapReduce的好處是它可以把在內存中不能完成的事轉變成可以在硬盤上高效完成。
Map--‐Reduce 對于集群的好處:
1,在多節點上冗余地存儲數據,以保證數據的持續性和一直可取性
2, 將計算移向數據端,以最大程度減少數據移動
3,簡單的程序模型隱藏所有的復雜度

Map,Reduce一般的流程:
Map階段:
a, 逐個文件逐行掃描
b, 掃描的同時抽取出我們感興趣的內容 (Keys)

Group by key
排序和洗牌
(Group by key階段會自動的運行,不需要自己寫)

Reduce階段:
a, 聚合 、 總結 、 過濾或轉換
b, 寫入結果

?

二? Hadoop Streaming原理

Hadoop 不僅可以使用Java進行MapReduce的編寫,也通過Hadoop Streaming的方式提供了其他語言編寫MR的接口。更重要的是,使用python來編寫MR,比使用親兒子Java編寫MR要更簡單和方便……所以在一些不非常復雜的任務中使用python來編寫MR比起使用Java,是更加劃算的。

Hadoop streaming是Hadoop的一個工具, 它幫助用戶創建和運行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行文件或腳本文件充當mapper或者reducer。

比如可以使用python語言來寫map-reduce使用“Hadoop Streaming”來完成傳統mapreduce的功能。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper mapper.py \ -reducer reducer.py


上述代碼通過參數input,output,mapper,reducer來定義輸入數據,輸出數據,mapper文件,reducer文件。

在上面的代碼中,mapper和reducer都是可執行文件,它們從標準輸入讀入數據(一行一行讀), 并把計算結果發給標準輸出。Streaming工具會創建一個Map/Reduce作業, 并把它發送給合適的集群,同時監視這個作業的整個執行過程。

如果一個可執行文件被用于mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行文件作為一個單獨的進程啟動。 mapper任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。 同時,mapper收集可執行文件進程標準輸出的內容,并把收到的每一行內容轉化成key/value對,作為mapper的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。

如果一個可執行文件被用于reducer,每個reducer任務會把這個可執行文件作為一個單獨的進程啟動。 Reducer任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。 同時,reducer收集可執行文件進程標準輸出的內容,并把每一行內容轉化成key/value對,作為reducer的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。

?

三 詞頻統計的例子?
?

Python實現Wordcount:

1. mapper.py

[root@vm wordcount]# vim mapper.py
寫入

#!/usr/bin/python import sys word2count = {} for line in sys.stdin:line = line.strip()words = filter(lambda word:word,line.split())for word in words:print("%s\t%s" % (word,1))


2. reducer.py

[root@vm wordcount]# vim reducer.py
寫入

#!/usr/bin/python from operator import itemgetter import sysword2count = {} for line in sys.stdin:line = line.strip()word,count = line.split()try:count = int(count)word2count[word] = word2count.get(word,0) + countexcept ValueError as err:print(err)passsorted_word2count = sorted(word2count.items(),key=itemgetter(0)) for word,count in sorted_word2count:print("%s\t%s" % (word, count))


3. 準備一個測試文件test.txt

[root@vm wordcount]# vim test.txt
寫入

this is a test this is a test this is a test this is a test


4. 本地測試

[root@vm wordcount]# cat test.txt |python mapper.py |sort|python reducer.py

a 4 is 4 test 4 this 4


[root@vm wordcount]#
5. 集群運行

集群運行前要將本地的測試文件上傳到hdfs
[root@vm wordcount]# hadoop fs -mkdir /user/root/wordcount
[root@vm wordcount]# hadoop fs -put test.txt /user/root/wordcount/
[root@vm wordcount]# hadoop fs -ls /user/root/wordcount/
Found 1 items
-rw-r--r--?? 3 root root???????? 60 2018-05-14 09:58 /user/root/wordcount/test.txt
[root@vm wordcount]#

?


運行mapreduce

[root@vm wordcount]# hadoop jar /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar -D mapred.reduce.tasks=1 -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py -input /user/root/wordcount/test.txt -output /user/root/wordcount/out

?

命令行查看結果

[root@vm wordcount]# hadoop fs -cat /user/root/wordcount/out/part-00000 a 4 is 4 test 4 this 4 [root@vm wordcount]#

四 使用第三方的Python庫

$HADOOP_HOME/bin/hadoop streaming -D mapred.job.priority='VERY_HIGH' -D mared.job.map.capacity=500 -D mapred.reduce.tasks=0 -D mapred.map.tasks=500 -input myInputDirs(你得HDFS路徑) -output myOutputDir(你的HDFS路徑) -mapper "python? yourpythonfile.py" -reducer "python? yourpythonfile.py" -file yourpythonfile.py(需要幾個就添加幾個-file) -cacheArchive "/xx/xx/xx/myvp.tar.gz#myvp"(此處是一個HDFS路徑,稍后用到)

?

使用第三方庫

需要使用第三方庫如bs4,numpy等時,需要用到虛擬環境virtualenv
virtualenv的使用

安裝

pip install virtualenv

新建虛擬環境

virtualenv myvp

使得虛擬環境的路徑為相對路徑

virtualenv --relocatable myvp

激活虛擬環境

source myvp/bin/activate

如果想退出,可以使用下面的命令

deactivate

激活后直接安裝各種需要的包

pip install XXX

壓縮環境包

tar -czf myvp.tar.gz myvp

在mapreduce上使用

在上面的腳本中可以看到使用了-catchArchive,但是路徑是HDFS的路徑,因此需要提前將本地的myvp.tai.gz包上傳到HDFS上。
同時#后面的myvp是文件的文件夾,解壓后還有一個myvp(因為壓縮的時候把文件夾本身也壓縮進去了),所有map中使用的時候的路徑就是myvp/myvp/bin/…
在map的python腳本中加入如下的代碼,會把第三方庫加入到python 路徑

import sys
sys.path.append("myvp/myvp/lib/python2.7")

?

?

?

?

?

參考:

https://blog.csdn.net/wawa8899/article/details/80305720

https://blog.csdn.net/wh357589873/article/details/70049088 ?

總結

以上是生活随笔為你收集整理的使用Hadoop Streaming 完成MapReduce(Python代码)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。