oracle表增量同步到hive分区表
本文以shell腳本,通過傳參的的形式,將服務器ip,oracle的庫名表名以及作為分區字段的字段名稱,hive的庫名,表名作為參數傳入,這樣可以做到靈活變更,提高通用性與方便性,通過定時器調度此腳本即可。
腳本包含三步:
? ?一:通過sqoop將oracle數據導入到hive臨時表,臨時表需創建,無分區表
? ?二:將hive臨時表數據insert到hive正式表,以傳入的分區字段作為分區,此腳本中分區有 年xxxx,月xxxx-xx,日xxxx-xx-xx
? ?三:因為我有用到impala,所以在第三步加上了impala刷新操作,如不刷新,impala將識別不到新增hive數據? ?
#!/bin/bash
#
# import table from oracle into hive
nargs=$#
echo "argument num: $nargs "
today=`date +%Y-%m-%d`
one_day=`date +%Y-%m-%d -d'-1 day'`
coll_db=''
coll_tab=''
coll_host_ip=''
coll_host_port=1521
coll_tab_username=''
coll_tab_passwd=''
hive_db=''
hive_tab=''
#hive_tab_cols=''
hive_map_cols=''
start_dt=''
end_dt=''
pt_col=''?
/usr/bin/kinit -kt /opt/yarn.keytab yarn
# argument parse
argParse()
{
?? ?echo "argument num: $nargs "
?? ?for ag in $@
?? ?do
?? ?#?? ?echo $ag
?? ??? ?arg_key=${ag%=*}
?? ??? ?arg_val=${ag#*=}
#?? ??? ?echo "${ag%=*}--- ${ag#*=}"?? ??? ?
#?? ??? ?echo "$arg_key---- $arg_val"
?? ??? ?case ${arg_key} in
?? ??? ??? ?"coll_host_ip") ?? ?coll_host_ip=$arg_val?? ?;;
?? ??? ??? ?"coll_db") ?? ??? ?coll_db=$arg_val?? ?;;
?? ??? ??? ?"coll_tab") ?? ??? ?coll_tab=$arg_val?? ?;;
?? ??? ??? ?"hive_db") ?? ??? ?hive_db=$arg_val?? ?;;
?? ??? ??? ?"hive_tab") ?? ??? ?hive_tab=$arg_val?? ?;;
?? ??? ??? ?#"hive_tab_cols") ?? ?hive_tab_cols=$arg_val?? ?;;
?? ??? ??? ?"hive_map_cols") ?? ?hive_map_cols=$arg_val?? ?;;
?? ??? ??? ?"start_dt") ?? ??? ?start_dt=$arg_val?? ?;;
?? ??? ??? ?"end_dt") ?? ??? ?end_dt=$arg_val?? ?;;
?? ??? ??? ?"pt_col") ?? ??? ?pt_col=$arg_val?? ?;;
?? ??? ?esac
?? ?done
}
# parse the arguments key:value paire
argParse $@
# pring argument
printArgs()
{
?? ?echo "coll_host_ip:$coll_host_ip"
?? ?echo "coll_db:$coll_db"
?? ?echo "coll_tab:$coll_tab"
?? ?echo "hive_db:$hive_db"
?? ?echo "hive_tab:$hive_tab"
?? ?#echo "hive_tab_cols:$hive_tab_cols"
?? ?echo "hive_map_cols:$hive_map_cols"
?? ?echo "start_dt:$start_dt"
?? ?echo "end_dt:$end_dt"
?? ?echo "pt_col:$pt_col"
? ? ? ? echo "one_day:$one_day"
? ? ? ? echo "today:$today"
}
# print the value of argument
printArgs
# sqoop import table of mysql to hive tmp table
sqoopImpTempTab()
{
?? ?echo ?"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import start @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "
? ? ? ? if [ -n "${hive_map_cols}" ]
? ? ? ? then
?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
? ? ? ? ? ? ? ? ? ? ? ? --table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
? ? ? ? ? ? ? ? ? ? ? ? --delete-target-dir \
? ? ? ? ? ? ?? ??? ?--hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
? ? ? ? ? ? ?? ??? ?--where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
?? ??? ??? ?--split-by ${hive_map_cols} \
? ? ? ? ? ? ?? ??? ?--null-string '\\N' --null-non-string '\\N'?
? ? ? ? else
?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
? ? ? ? ?? ? ? ? ? ?--table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
?? ??? ??? ?--delete-target-dir \
? ? ? ? ? ? ? ? ? ? ? ? --hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
? ? ? ? ? ? ? ? ? ? ? ? --where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
? ? ? ? ? ? ? ? ? ? ? ? --null-string '\\N' --null-non-string '\\N' #&> /dev/null?
? ? ? ? fi
? ? ? ? RET=$?
? ? ? ? if [ $RET -eq 0 ]; then
? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [INFO] sqoop import database:$hive_db table:temp_$hive_tab successfully."
? ? ? ? else
? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [ERROR] sqoop import database:$hive_db table:temp_$hive_tab error."
? ? ? ? ? ? ? ? exit 5
? ? ? ? fi
? ? ? ? echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import end @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "
}
sqoopImpTempTab
# store tmp table to hive rcfile table
hiveStoreAsRc()
{
?? ?#pt_col=create_time
? ? ? ? echo '---------------------------------------------------------- hive store as rcfile start ?-------------------------------------------------------------------------------------'
?? ?hsql="use ${hive_db};set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
? ? ? ? ? ? ? ? set mapreduce.map.memory.mb=15000;set mapreduce.reduce.memory.mb=15000; set hive.merge.mapredfiles=true;set hive.exec.max.created.files=100000;
? ? ? ? ? ? ? ? SET hive.exec.max.dynamic.partitions=100000;SET hive.exec.max.dynamic.partitions.pernode=100000;from tmp_${hive_db}.tmp_${hive_tab} \
?? ??? ?INSERT OVERWRITE TABLE ${hive_db}.${hive_tab} PARTITION(pk_year,pk_month,pk_day) select *,substr(${pt_col}, 1, 4),substr(${pt_col}, 1, 7),substr(${pt_col}, 1, 10) \
?? ??? ?where ${pt_col} >='${one_day}' and ${pt_col}<'${today}' "
?? ?#phsql=$hsql" INSERT INTO TABLE ${hive_db}.${hive_tab} PARTITION(partition_key='${sdt:0:7}') select $hive_tab_cols where $pt_col > '${sdt:0:7}';"?? ?
?? ?echo "#################### hsql: $hsql"
?? ?#hive -S -e "${hsql}"
?? ?beeline --hiveconf mapreduce.job.queuename=bf_yarn_pool.production --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:10000/default;' -n yarn -p *******? -e "${hsql}"?? ?
?? ?RET=$?
?? ?if [ $RET -eq 0 ]; then
? ? ? ? ?? ?echo "`date +\"%F %T\"` [INFO] ${hive_db}.${hive_tab} ?store successfully."?
? ? ? ??? ? ?? ?#exit 0
?? ?else
? ? ? ??? ??? ?echo "`date +\"%F %T\"` [ERROR] ${hive_db}.${hive_tab} ?store failure."?
? ? ? ??? ??? ?exit 5
?? ?fi
? ? ? ? echo '---------------------------------------------------------- hive store as rcfile end ?-------------------------------------------------------------------------------------'
}
hiveStoreAsRc
# impala table refresh
impTabRefrsh()
{
?? ?echo '********************************************************** impala refresh table start *****************************************************************************************'?? ?
? ? ? ? beeline --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:21050/default;' -n yarn -p ******* -e "refresh ${hive_db}.${hive_tab}"
?? ?RET=$?
?? ?if [ ${RET} -eq 0 ]; then
?? ??? ?echo "`date +\"%F %T\"` [INFO] impala:refresh ${hive_db}.${hive_tab} success!"
?? ??? ?exit 0
?? ?else ? ?
?? ??? ?echo "`date +\"%F %T\"` [ERROR] impala:refresh ${hive_db}.${hive_tab} failure!"
?? ??? ?exit 5
?? ?fi?? ?
?? ?echo '********************************************************** impala refresh table end *****************************************************************************************'?? ?
}
impTabRefrsh?
?
總結
以上是生活随笔為你收集整理的oracle表增量同步到hive分区表的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: *基于类平衡自我训练的无监督域自适应用于
- 下一篇: 渗透测试-----ARP攻击