日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop常用操作汇总

發(fā)布時間:2024/7/5 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop常用操作汇总 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Hadoop Streaming示例程序(wordcount)

run_hadoop_word_counter.sh

$HADOOP_BIN streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \-file "mapper_word_counter.py" \ -file "reducer_word_counter.py" \-file "filter_word_counter.py" \-mapper "./python2.7.2/bin/python mapper_word_counter.py" \-combiner "./python2.7.2/bin/python reducer_word_counter.py" \-reducer "./python2.7.2/bin/python reducer_word_counter.py" \-jobconf abaci.job.base.environment="centos6u3_hadoop" \-jobconf mapred.job.priority="NORMAL" \-jobconf mapred.job.name="${TASK_NAME}" \-jobconf mapred.map.tasks="${MAP_NUM}" \-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \-jobconf mapred.map.memory.limit="1000" \-jobconf mapred.reduce.memory.limit="1000" \-jobconf mapred.job.map.capacity="3000" \-jobconf mapred.job.reduce.capacity="2500" \-jobconf mapred.job.keep.files.hours=12 \-jobconf mapred.max.map.failures.percent=1 \-jobconf mapred.reduce.tasks.speculative.execution="false"

mapper_word_counter.py

import sys for line in sys.stdin:fields = line.strip().split('\t')try:cnt = 1 dateval = fields[1]sys.stdout.write('%s\t%d\n' %(dateval, cnt))except Exception as exp:sys.stderr.write("exp:%s, %s" %(str(exp), line))

reducer_word_counter.py

import sys word_pre = None counter_pre = 0 for line in sys.stdin:try:word, cnt = line.strip().split('\t') cnt = int(cnt)except Exception as exp:sys.stderr.write('Exp:%s,line:%s' %(str(exp), line.strip()))continueif word == word_pre:counter_pre += cnt else:if word_pre:print('%s\t%d' %(word_pre, counter_pre))word_pre = wordcounter_pre = cnt if word_pre:print('%s\t%d' %(word_pre, counter_pre))

純文本輸入格式

  • 每個mapper輸入若干行
    -inputformat "org.apache.hadoop.mapred.TextInputFormat"
  • 指定每個mapper輸入的行數(shù)
    -inputformat "org.apache.hadoop.mapred.lib.NLineInputFormat" -jobconf mapred.line.input.format.linespermap="5"

文件分發(fā)方式:

-file將客戶端本地文件打成jar包上傳到HDFS然后分發(fā)到計算節(jié)點(diǎn);
-cacheFile將HDFS文件分發(fā)到計算節(jié)點(diǎn);
-cacheArchive將HDFS壓縮文件分發(fā)到計算節(jié)點(diǎn)并解壓;

分桶&排序

Hadoop默認(rèn)會把map輸出行中遇到的第一個分隔符(默認(rèn)為\t)前面的部分作為key,后面的作為value,如果輸出行中沒有指定的分隔符,則整行作為key,value被設(shè)置為空字符串。mapper輸出的key經(jīng)過partition分發(fā)到不同的reduce里。

  • 應(yīng)用示例
${HADOOP_BIN} streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-mapper cat \-reducer cat \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \-jobconf stream.num.map.output.key.fields=4 \-jobconf stream.map.output.field.separator=. \-jobconf map.output.key.field.separator=. \-jobconf mapred.text.key.partitioner.options=-k1,2 \-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr" \-jobconf stream.reduce.output.field.separator=. \-jobconf stream.num.reduce.output.key.fields=4 \-jobconf mapred.reduce.tasks=5

說明:

  • 設(shè)定mapper輸出的key
    stream.map.output.field.separator 設(shè)置map輸出的字段分隔符
    stream.num.map.output.key.fields 設(shè)置map輸出的前幾個字段作為key
  • 設(shè)定根據(jù)key進(jìn)行分桶的規(guī)則
    org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner partition類
    map.output.key.field.separator 設(shè)置key內(nèi)的字段分隔符(KeyFieldBasedPartitioner和KeyFieldBasedComparator所特有)
    num.key.fields.for.partition 設(shè)置key內(nèi)前幾個字段用來做partition
    mapred.text.key.partitioner.options 可單獨(dú)指定key中哪些字段做partition,和num.key.fields.for.partition一起使用以num.key.fields.for.partition為準(zhǔn)
  • 設(shè)定根據(jù)key進(jìn)行排序的規(guī)則
    KeyFieldBasedComparator 可靈活設(shè)置的高級比較器,默認(rèn)使用Text的基于字典序或者通過-n來基于數(shù)字比較
    mapred.text.key.comparator.options 設(shè)置key中需要比較的字段或字節(jié)范圍
  • 設(shè)定reducer輸出的key
    stream.reduce.output.field.separator 設(shè)置reduce輸出的字段分隔符
    stream.num.reduce.output.key.fields 設(shè)置reduce輸出的前幾個字段作為key

多路輸出

Hadoop支持多路輸出,可以將MapReduce的處理數(shù)據(jù)輸出到多個part-xxxxx-X文件中(X是A-Z共26個字母中的一個)。程序需要在maper(正對僅有mapper的MR任務(wù))/reducer(針對包含reducer的任務(wù))程序中將輸出形式由<key,value>變?yōu)?lt;key, value#X>,以便輸出特定后綴的文件中。其中#X僅僅用做指定輸出文件后綴, 不會出現(xiàn)在輸出內(nèi)容中。
啟動腳本中需要指定
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
或者
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat

  • 應(yīng)用示例
    run_hadoop.sh
${HADOOP_BIN} streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \-file "mapper_worker.sh" \-file "reducer_worker.py" \-mapper "sh mapper_worker.sh" \-reducer "python2.7.2/bin/python reducer_worker.py" \-inputformat "org.apache.hadoop.mapred.TextInputFormat" \-outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \-jobconf mapred.job.priority="NORMAL" \-jobconf mapred.job.name="${TASK_NAME}" \-jobconf mapred.map.tasks="${MAP_NUM}" \-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \-jobconf mapred.max.split.size=134217728 \-jobconf mapred.map.memory.limit="800" \-jobconf mapred.reduce.memory.limit="500" \-jobconf mapred.job.map.capacity="3500" \-jobconf mapred.job.reduce.capacity="2000" \-jobconf mapred.job.keep.files.hours=12 \-jobconf mapred.max.map.failures.percent=1 \-jobconf mapred.reduce.tasks.speculative.execution="false"

reducer_worder.py

for line in sys.stdin:record = line.strip()fields = record.split('\t')if len(fields) != 7:continuevcpurl, playurl, title, poster, duration, pubtime, accept = fieldsduration = int(duration)pubtime = int(pubtime)accept = int(accept)if duration < 60:sys.stdout.write('%s#A\n' %(record))elif duration < 300:sys.stdout.write('%s#B\n' %(record))else:sys.stdout.write('%s#C\n' %(record))

本地調(diào)試

為避免在啟動MR任務(wù)后才發(fā)現(xiàn)程序bug,最好提前在本地模擬MR的運(yùn)行流程,驗(yàn)證結(jié)果是否符合預(yù)期

cat inputfile | ./mapper_task.sh | sort -t$'\t' -k1,1 | ./reducer.sh

壓縮輸出

Hadoop默認(rèn)支持gzip壓縮, streaming作業(yè)中指定以下參數(shù)即可使輸出以gzip形式壓縮.

-D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

Hadoop 是可自行讀取gzip壓縮的數(shù)據(jù),無需特殊指明輸入是 Gzip 壓縮。Gzip 的特點(diǎn)是壓縮比較高,Hadoop 原生支持,缺點(diǎn)是壓縮效率并不是很高,壓縮比和效率不可兼得,需要考慮其他壓縮方式。

Hadoop常用配置項(xiàng)

配置名說明
abaci.job.base.environmentcentos6u3_hadoop 如果系統(tǒng)環(huán)境需要升級,可以指定為 centos6u3_hadoop 支持更高版本的 glibc
stream.memory.limit單個map/reduce最高使用內(nèi)存,默認(rèn)800M
mapred.map.memory.limit單個map最高使用內(nèi)存,優(yōu)先級高于stream.memory.limit
mapred.reduce.memory.limit單個reduce最高使用內(nèi)存,優(yōu)先級高于stream.memory.limit
mapred.map.capacity.per.tasktracker每臺機(jī)器最多同時啟動map個數(shù)
mapred.reduce.capacity.per.tasktracker每臺機(jī)器最多同時啟動reduce個數(shù)
mapred.job.map.capacitymap并發(fā)數(shù)目
mapred.job.reduce.capacityreduce并發(fā)數(shù)目
abaci.job.map.max.capacitymap并發(fā)限制,默認(rèn)10000
abaci.job.reduce.max.capacityreduce并發(fā)限制,默認(rèn)3000
mapred.map.tasksmap數(shù)目
mapred.reduce.tasksreduce數(shù)目
mapred.job.reuse.jvm.num.tasks1表示不reuse,-1表示無限r(nóng)euse,其他數(shù)值表示每個jvm reuse次數(shù)。reuse的時候,map結(jié)束時不會釋放內(nèi)存
mapred.compress.map.output指定map的輸出是否壓縮。有助于減小數(shù)據(jù)量,減小io壓力,但壓縮和解壓有cpu成本,需要慎重選擇壓縮算法
mapred.map.output.compression.codecmap輸出的壓縮算法
mapred.output.compressreduce輸出是否壓縮
mapred.output.compression.codec控制mapred的輸出的壓縮的方式
io.compression.codecs壓縮算法
mapred.max.map.failures.percent容忍map錯誤百分比,默認(rèn)為0
mapred.max.reduce.failures.percent容忍reduce錯誤百分比,默認(rèn)為0
stream.map.output.field.separatormap輸出分隔符,默認(rèn)Tab
stream.reduce.output.field.separatorreduce輸出分隔符,默認(rèn)Tab
mapred.textoutputformat.separator設(shè)置TextOutputFormat的輸出key,value分隔符,默認(rèn)Tab
mapred.textoutputformat.ignoreseparator設(shè)置為true后,當(dāng)只有key沒有value會去掉自動補(bǔ)上的Tab
mapred.min.split.size指定map最小處理數(shù)據(jù)量,單位B
mapred.max.split.size指定map最多處理數(shù)據(jù)量,單位B,同時設(shè)置inputformat=org.apache.hadoop.mapred.CombineTextInputFormat
mapred.combine.input.format.local.only是否只合并本節(jié)點(diǎn),默認(rèn)true,設(shè)置為false可以跨節(jié)點(diǎn)合并數(shù)據(jù)
abaci.job.map.cpu.percentmap消耗cpu占比,參數(shù)默認(rèn)值40(表示1個cpu的40%,即0.4個cpu)
abaci.job.reduce.cpu.percentreduce消耗cpu占比,參數(shù)默認(rèn)值40(表示1個cpu的40%,即0.4個cpu)
mapred.map.capacity.per.tasktracker表示每個節(jié)點(diǎn)最多并行跑幾個該job的map任務(wù)(請根據(jù)內(nèi)存情況適當(dāng)增減該參數(shù),默認(rèn)是8)
mapred.reduce.capacity.per.tasktracker表示每個節(jié)點(diǎn)最多并行跑幾個該job的reduce任務(wù)(請根據(jù)內(nèi)存情況適當(dāng)增減該參數(shù),默認(rèn)是8)
mapred.map.tasks.speculative.execution開啟map預(yù)測執(zhí)行,默認(rèn)true
mapred.reduce.tasks.speculative.execution開啟reduce預(yù)測執(zhí)行,默認(rèn)true

Hadoop環(huán)境下系統(tǒng)變量

  • 變量名列表
變量名變量說明
HADOOP_HOME計算節(jié)點(diǎn)上配置的Hadoop路徑
LD_LIBRARY_PATH計算節(jié)點(diǎn)上加載庫文件的路徑列表
PWD當(dāng)前工作目錄
dfs_block_size當(dāng)前設(shè)置的HDFS文件塊大小
map_input_filemapper正在處理的輸入文件路徑
mapred_job_id作業(yè)ID
mapred_job_name作業(yè)名
mapred_tip_id當(dāng)前任務(wù)的第幾次重試
mapred_task_id任務(wù)ID
mapred_task_is_map當(dāng)前任務(wù)是否為map
mapred_output_dir計算輸出路徑
mapred_map_tasks計算的map任務(wù)數(shù)
mapred_reduce_tasks計算的reduce任務(wù)數(shù)

https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters

  • 應(yīng)用示例:
    Shell版
#!/bin/bashset -o pipefail HOST="localhost" PORT=$((1000 + ${mapred_task_partition}))awk '{print $2}' \| ./access_remote_data ${HOST} ${PORT} outdata.gzhdfs_outfile=${mapred_work_output_dir}/${mapred_task_partition}.pack cat outdata.gz \| gzip -d \| python ../postprocess.py| ${HADOOP_HOME}/bin/hadoop fs -D hadoop.job.ugi="username,pwd" -copyFromLocal - ${hdfs_outfile}

Python版

import osinput_file = os.environ['mapreduce_map_input_file'] #do something else

References

Hadoop Streaming相關(guān)官方文檔:https://hadoop.apache.org/docs/r3.1.2/hadoop-streaming/HadoopStreaming.html
Hadoop Streaming入門:http://icejoywoo.github.io/2015/09/28/introduction-to-hadoop-streaming.html
Hadoop排序工具用法小結(jié):http://www.dreamingfish123.info/?p=1102
Hadoop壓縮選項(xiàng)權(quán)衡:https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c

轉(zhuǎn)載于:https://www.cnblogs.com/jeromeblog/p/11464693.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop常用操作汇总的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。