Hadoop常用操作汇总
Hadoop Streaming示例程序(wordcount)
run_hadoop_word_counter.sh
$HADOOP_BIN streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \-file "mapper_word_counter.py" \ -file "reducer_word_counter.py" \-file "filter_word_counter.py" \-mapper "./python2.7.2/bin/python mapper_word_counter.py" \-combiner "./python2.7.2/bin/python reducer_word_counter.py" \-reducer "./python2.7.2/bin/python reducer_word_counter.py" \-jobconf abaci.job.base.environment="centos6u3_hadoop" \-jobconf mapred.job.priority="NORMAL" \-jobconf mapred.job.name="${TASK_NAME}" \-jobconf mapred.map.tasks="${MAP_NUM}" \-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \-jobconf mapred.map.memory.limit="1000" \-jobconf mapred.reduce.memory.limit="1000" \-jobconf mapred.job.map.capacity="3000" \-jobconf mapred.job.reduce.capacity="2500" \-jobconf mapred.job.keep.files.hours=12 \-jobconf mapred.max.map.failures.percent=1 \-jobconf mapred.reduce.tasks.speculative.execution="false"mapper_word_counter.py
import sys for line in sys.stdin:fields = line.strip().split('\t')try:cnt = 1 dateval = fields[1]sys.stdout.write('%s\t%d\n' %(dateval, cnt))except Exception as exp:sys.stderr.write("exp:%s, %s" %(str(exp), line))reducer_word_counter.py
import sys word_pre = None counter_pre = 0 for line in sys.stdin:try:word, cnt = line.strip().split('\t') cnt = int(cnt)except Exception as exp:sys.stderr.write('Exp:%s,line:%s' %(str(exp), line.strip()))continueif word == word_pre:counter_pre += cnt else:if word_pre:print('%s\t%d' %(word_pre, counter_pre))word_pre = wordcounter_pre = cnt if word_pre:print('%s\t%d' %(word_pre, counter_pre))純文本輸入格式
- 每個mapper輸入若干行
-inputformat "org.apache.hadoop.mapred.TextInputFormat" - 指定每個mapper輸入的行數(shù)
-inputformat "org.apache.hadoop.mapred.lib.NLineInputFormat" -jobconf mapred.line.input.format.linespermap="5"
文件分發(fā)方式:
-file將客戶端本地文件打成jar包上傳到HDFS然后分發(fā)到計算節(jié)點(diǎn);
-cacheFile將HDFS文件分發(fā)到計算節(jié)點(diǎn);
-cacheArchive將HDFS壓縮文件分發(fā)到計算節(jié)點(diǎn)并解壓;
分桶&排序
Hadoop默認(rèn)會把map輸出行中遇到的第一個分隔符(默認(rèn)為\t)前面的部分作為key,后面的作為value,如果輸出行中沒有指定的分隔符,則整行作為key,value被設(shè)置為空字符串。mapper輸出的key經(jīng)過partition分發(fā)到不同的reduce里。
- 應(yīng)用示例
說明:
- 設(shè)定mapper輸出的key
stream.map.output.field.separator 設(shè)置map輸出的字段分隔符
stream.num.map.output.key.fields 設(shè)置map輸出的前幾個字段作為key - 設(shè)定根據(jù)key進(jìn)行分桶的規(guī)則
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner partition類
map.output.key.field.separator 設(shè)置key內(nèi)的字段分隔符(KeyFieldBasedPartitioner和KeyFieldBasedComparator所特有)
num.key.fields.for.partition 設(shè)置key內(nèi)前幾個字段用來做partition
mapred.text.key.partitioner.options 可單獨(dú)指定key中哪些字段做partition,和num.key.fields.for.partition一起使用以num.key.fields.for.partition為準(zhǔn) - 設(shè)定根據(jù)key進(jìn)行排序的規(guī)則
KeyFieldBasedComparator 可靈活設(shè)置的高級比較器,默認(rèn)使用Text的基于字典序或者通過-n來基于數(shù)字比較
mapred.text.key.comparator.options 設(shè)置key中需要比較的字段或字節(jié)范圍 - 設(shè)定reducer輸出的key
stream.reduce.output.field.separator 設(shè)置reduce輸出的字段分隔符
stream.num.reduce.output.key.fields 設(shè)置reduce輸出的前幾個字段作為key
多路輸出
Hadoop支持多路輸出,可以將MapReduce的處理數(shù)據(jù)輸出到多個part-xxxxx-X文件中(X是A-Z共26個字母中的一個)。程序需要在maper(正對僅有mapper的MR任務(wù))/reducer(針對包含reducer的任務(wù))程序中將輸出形式由<key,value>變?yōu)?lt;key, value#X>,以便輸出特定后綴的文件中。其中#X僅僅用做指定輸出文件后綴, 不會出現(xiàn)在輸出內(nèi)容中。
啟動腳本中需要指定
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
或者
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat
- 應(yīng)用示例
run_hadoop.sh
reducer_worder.py
for line in sys.stdin:record = line.strip()fields = record.split('\t')if len(fields) != 7:continuevcpurl, playurl, title, poster, duration, pubtime, accept = fieldsduration = int(duration)pubtime = int(pubtime)accept = int(accept)if duration < 60:sys.stdout.write('%s#A\n' %(record))elif duration < 300:sys.stdout.write('%s#B\n' %(record))else:sys.stdout.write('%s#C\n' %(record))本地調(diào)試
為避免在啟動MR任務(wù)后才發(fā)現(xiàn)程序bug,最好提前在本地模擬MR的運(yùn)行流程,驗(yàn)證結(jié)果是否符合預(yù)期
cat inputfile | ./mapper_task.sh | sort -t$'\t' -k1,1 | ./reducer.sh壓縮輸出
Hadoop默認(rèn)支持gzip壓縮, streaming作業(yè)中指定以下參數(shù)即可使輸出以gzip形式壓縮.
-D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodecHadoop 是可自行讀取gzip壓縮的數(shù)據(jù),無需特殊指明輸入是 Gzip 壓縮。Gzip 的特點(diǎn)是壓縮比較高,Hadoop 原生支持,缺點(diǎn)是壓縮效率并不是很高,壓縮比和效率不可兼得,需要考慮其他壓縮方式。
Hadoop常用配置項(xiàng)
| abaci.job.base.environment | centos6u3_hadoop 如果系統(tǒng)環(huán)境需要升級,可以指定為 centos6u3_hadoop 支持更高版本的 glibc |
| stream.memory.limit | 單個map/reduce最高使用內(nèi)存,默認(rèn)800M |
| mapred.map.memory.limit | 單個map最高使用內(nèi)存,優(yōu)先級高于stream.memory.limit |
| mapred.reduce.memory.limit | 單個reduce最高使用內(nèi)存,優(yōu)先級高于stream.memory.limit |
| mapred.map.capacity.per.tasktracker | 每臺機(jī)器最多同時啟動map個數(shù) |
| mapred.reduce.capacity.per.tasktracker | 每臺機(jī)器最多同時啟動reduce個數(shù) |
| mapred.job.map.capacity | map并發(fā)數(shù)目 |
| mapred.job.reduce.capacity | reduce并發(fā)數(shù)目 |
| abaci.job.map.max.capacity | map并發(fā)限制,默認(rèn)10000 |
| abaci.job.reduce.max.capacity | reduce并發(fā)限制,默認(rèn)3000 |
| mapred.map.tasks | map數(shù)目 |
| mapred.reduce.tasks | reduce數(shù)目 |
| mapred.job.reuse.jvm.num.tasks | 1表示不reuse,-1表示無限r(nóng)euse,其他數(shù)值表示每個jvm reuse次數(shù)。reuse的時候,map結(jié)束時不會釋放內(nèi)存 |
| mapred.compress.map.output | 指定map的輸出是否壓縮。有助于減小數(shù)據(jù)量,減小io壓力,但壓縮和解壓有cpu成本,需要慎重選擇壓縮算法 |
| mapred.map.output.compression.codec | map輸出的壓縮算法 |
| mapred.output.compress | reduce輸出是否壓縮 |
| mapred.output.compression.codec | 控制mapred的輸出的壓縮的方式 |
| io.compression.codecs | 壓縮算法 |
| mapred.max.map.failures.percent | 容忍map錯誤百分比,默認(rèn)為0 |
| mapred.max.reduce.failures.percent | 容忍reduce錯誤百分比,默認(rèn)為0 |
| stream.map.output.field.separator | map輸出分隔符,默認(rèn)Tab |
| stream.reduce.output.field.separator | reduce輸出分隔符,默認(rèn)Tab |
| mapred.textoutputformat.separator | 設(shè)置TextOutputFormat的輸出key,value分隔符,默認(rèn)Tab |
| mapred.textoutputformat.ignoreseparator | 設(shè)置為true后,當(dāng)只有key沒有value會去掉自動補(bǔ)上的Tab |
| mapred.min.split.size | 指定map最小處理數(shù)據(jù)量,單位B |
| mapred.max.split.size | 指定map最多處理數(shù)據(jù)量,單位B,同時設(shè)置inputformat=org.apache.hadoop.mapred.CombineTextInputFormat |
| mapred.combine.input.format.local.only | 是否只合并本節(jié)點(diǎn),默認(rèn)true,設(shè)置為false可以跨節(jié)點(diǎn)合并數(shù)據(jù) |
| abaci.job.map.cpu.percent | map消耗cpu占比,參數(shù)默認(rèn)值40(表示1個cpu的40%,即0.4個cpu) |
| abaci.job.reduce.cpu.percent | reduce消耗cpu占比,參數(shù)默認(rèn)值40(表示1個cpu的40%,即0.4個cpu) |
| mapred.map.capacity.per.tasktracker | 表示每個節(jié)點(diǎn)最多并行跑幾個該job的map任務(wù)(請根據(jù)內(nèi)存情況適當(dāng)增減該參數(shù),默認(rèn)是8) |
| mapred.reduce.capacity.per.tasktracker | 表示每個節(jié)點(diǎn)最多并行跑幾個該job的reduce任務(wù)(請根據(jù)內(nèi)存情況適當(dāng)增減該參數(shù),默認(rèn)是8) |
| mapred.map.tasks.speculative.execution | 開啟map預(yù)測執(zhí)行,默認(rèn)true |
| mapred.reduce.tasks.speculative.execution | 開啟reduce預(yù)測執(zhí)行,默認(rèn)true |
Hadoop環(huán)境下系統(tǒng)變量
- 變量名列表
| HADOOP_HOME | 計算節(jié)點(diǎn)上配置的Hadoop路徑 |
| LD_LIBRARY_PATH | 計算節(jié)點(diǎn)上加載庫文件的路徑列表 |
| PWD | 當(dāng)前工作目錄 |
| dfs_block_size | 當(dāng)前設(shè)置的HDFS文件塊大小 |
| map_input_file | mapper正在處理的輸入文件路徑 |
| mapred_job_id | 作業(yè)ID |
| mapred_job_name | 作業(yè)名 |
| mapred_tip_id | 當(dāng)前任務(wù)的第幾次重試 |
| mapred_task_id | 任務(wù)ID |
| mapred_task_is_map | 當(dāng)前任務(wù)是否為map |
| mapred_output_dir | 計算輸出路徑 |
| mapred_map_tasks | 計算的map任務(wù)數(shù) |
| mapred_reduce_tasks | 計算的reduce任務(wù)數(shù) |
https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters
- 應(yīng)用示例:
Shell版
Python版
import osinput_file = os.environ['mapreduce_map_input_file'] #do something elseReferences
Hadoop Streaming相關(guān)官方文檔:https://hadoop.apache.org/docs/r3.1.2/hadoop-streaming/HadoopStreaming.html
Hadoop Streaming入門:http://icejoywoo.github.io/2015/09/28/introduction-to-hadoop-streaming.html
Hadoop排序工具用法小結(jié):http://www.dreamingfish123.info/?p=1102
Hadoop壓縮選項(xiàng)權(quán)衡:https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c
轉(zhuǎn)載于:https://www.cnblogs.com/jeromeblog/p/11464693.html
總結(jié)
以上是生活随笔為你收集整理的Hadoop常用操作汇总的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Newlife.Cube登录登出
- 下一篇: 【Django】基于Django架构网站