日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

oracle表增量同步到hive分区表

發布時間:2024/1/8 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 oracle表增量同步到hive分区表 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文以shell腳本,通過傳參的的形式,將服務器ip,oracle的庫名表名以及作為分區字段的字段名稱,hive的庫名,表名作為參數傳入,這樣可以做到靈活變更,提高通用性與方便性,通過定時器調度此腳本即可。

腳本包含三步:

? ?一:通過sqoop將oracle數據導入到hive臨時表,臨時表需創建,無分區表

? ?二:將hive臨時表數據insert到hive正式表,以傳入的分區字段作為分區,此腳本中分區有 年xxxx,月xxxx-xx,日xxxx-xx-xx

? ?三:因為我有用到impala,所以在第三步加上了impala刷新操作,如不刷新,impala將識別不到新增hive數據? ?

#!/bin/bash
#

# import table from oracle into hive

nargs=$#
echo "argument num: $nargs "

today=`date +%Y-%m-%d`
one_day=`date +%Y-%m-%d -d'-1 day'`
coll_db=''
coll_tab=''
coll_host_ip=''
coll_host_port=1521
coll_tab_username=''
coll_tab_passwd=''

hive_db=''
hive_tab=''
#hive_tab_cols=''
hive_map_cols=''

start_dt=''
end_dt=''

pt_col=''?

/usr/bin/kinit -kt /opt/yarn.keytab yarn

# argument parse
argParse()
{
?? ?echo "argument num: $nargs "

?? ?for ag in $@
?? ?do
?? ?#?? ?echo $ag
?? ??? ?arg_key=${ag%=*}
?? ??? ?arg_val=${ag#*=}
#?? ??? ?echo "${ag%=*}--- ${ag#*=}"?? ??? ?
#?? ??? ?echo "$arg_key---- $arg_val"
?? ??? ?case ${arg_key} in
?? ??? ??? ?"coll_host_ip") ?? ?coll_host_ip=$arg_val?? ?;;
?? ??? ??? ?"coll_db") ?? ??? ?coll_db=$arg_val?? ?;;
?? ??? ??? ?"coll_tab") ?? ??? ?coll_tab=$arg_val?? ?;;
?? ??? ??? ?"hive_db") ?? ??? ?hive_db=$arg_val?? ?;;
?? ??? ??? ?"hive_tab") ?? ??? ?hive_tab=$arg_val?? ?;;
?? ??? ??? ?#"hive_tab_cols") ?? ?hive_tab_cols=$arg_val?? ?;;
?? ??? ??? ?"hive_map_cols") ?? ?hive_map_cols=$arg_val?? ?;;
?? ??? ??? ?"start_dt") ?? ??? ?start_dt=$arg_val?? ?;;
?? ??? ??? ?"end_dt") ?? ??? ?end_dt=$arg_val?? ?;;
?? ??? ??? ?"pt_col") ?? ??? ?pt_col=$arg_val?? ?;;
?? ??? ?esac
?? ?done
}

# parse the arguments key:value paire
argParse $@
# pring argument
printArgs()
{
?? ?echo "coll_host_ip:$coll_host_ip"
?? ?echo "coll_db:$coll_db"
?? ?echo "coll_tab:$coll_tab"
?? ?echo "hive_db:$hive_db"
?? ?echo "hive_tab:$hive_tab"
?? ?#echo "hive_tab_cols:$hive_tab_cols"
?? ?echo "hive_map_cols:$hive_map_cols"
?? ?echo "start_dt:$start_dt"
?? ?echo "end_dt:$end_dt"
?? ?echo "pt_col:$pt_col"
? ? ? ? echo "one_day:$one_day"
? ? ? ? echo "today:$today"
}

# print the value of argument
printArgs

# sqoop import table of mysql to hive tmp table
sqoopImpTempTab()
{
?? ?echo ?"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import start @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "
? ? ? ? if [ -n "${hive_map_cols}" ]
? ? ? ? then
?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
? ? ? ? ? ? ? ? ? ? ? ? --table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
? ? ? ? ? ? ? ? ? ? ? ? --delete-target-dir \
? ? ? ? ? ? ?? ??? ?--hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
? ? ? ? ? ? ?? ??? ?--where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
?? ??? ??? ?--split-by ${hive_map_cols} \
? ? ? ? ? ? ?? ??? ?--null-string '\\N' --null-non-string '\\N'?
? ? ? ? else
?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
? ? ? ? ?? ? ? ? ? ?--table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
?? ??? ??? ?--delete-target-dir \
? ? ? ? ? ? ? ? ? ? ? ? --hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
? ? ? ? ? ? ? ? ? ? ? ? --where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
? ? ? ? ? ? ? ? ? ? ? ? --null-string '\\N' --null-non-string '\\N' #&> /dev/null?

? ? ? ? fi
? ? ? ? RET=$?
? ? ? ? if [ $RET -eq 0 ]; then
? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [INFO] sqoop import database:$hive_db table:temp_$hive_tab successfully."
? ? ? ? else
? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [ERROR] sqoop import database:$hive_db table:temp_$hive_tab error."
? ? ? ? ? ? ? ? exit 5
? ? ? ? fi
? ? ? ? echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import end @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "

}
sqoopImpTempTab

# store tmp table to hive rcfile table
hiveStoreAsRc()
{

?? ?#pt_col=create_time
? ? ? ? echo '---------------------------------------------------------- hive store as rcfile start ?-------------------------------------------------------------------------------------'

?? ?hsql="use ${hive_db};set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
? ? ? ? ? ? ? ? set mapreduce.map.memory.mb=15000;set mapreduce.reduce.memory.mb=15000; set hive.merge.mapredfiles=true;set hive.exec.max.created.files=100000;
? ? ? ? ? ? ? ? SET hive.exec.max.dynamic.partitions=100000;SET hive.exec.max.dynamic.partitions.pernode=100000;from tmp_${hive_db}.tmp_${hive_tab} \
?? ??? ?INSERT OVERWRITE TABLE ${hive_db}.${hive_tab} PARTITION(pk_year,pk_month,pk_day) select *,substr(${pt_col}, 1, 4),substr(${pt_col}, 1, 7),substr(${pt_col}, 1, 10) \
?? ??? ?where ${pt_col} >='${one_day}' and ${pt_col}<'${today}' "

?? ?#phsql=$hsql" INSERT INTO TABLE ${hive_db}.${hive_tab} PARTITION(partition_key='${sdt:0:7}') select $hive_tab_cols where $pt_col > '${sdt:0:7}';"?? ?
?? ?echo "#################### hsql: $hsql"
?? ?#hive -S -e "${hsql}"
?? ?beeline --hiveconf mapreduce.job.queuename=bf_yarn_pool.production --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:10000/default;' -n yarn -p *******? -e "${hsql}"?? ?

?? ?RET=$?
?? ?if [ $RET -eq 0 ]; then
? ? ? ? ?? ?echo "`date +\"%F %T\"` [INFO] ${hive_db}.${hive_tab} ?store successfully."?
? ? ? ??? ? ?? ?#exit 0
?? ?else
? ? ? ??? ??? ?echo "`date +\"%F %T\"` [ERROR] ${hive_db}.${hive_tab} ?store failure."?
? ? ? ??? ??? ?exit 5
?? ?fi
? ? ? ? echo '---------------------------------------------------------- hive store as rcfile end ?-------------------------------------------------------------------------------------'
}

hiveStoreAsRc

# impala table refresh
impTabRefrsh()
{
?? ?echo '********************************************************** impala refresh table start *****************************************************************************************'?? ?
? ? ? ? beeline --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:21050/default;' -n yarn -p ******* -e "refresh ${hive_db}.${hive_tab}"

?? ?RET=$?

?? ?if [ ${RET} -eq 0 ]; then
?? ??? ?echo "`date +\"%F %T\"` [INFO] impala:refresh ${hive_db}.${hive_tab} success!"
?? ??? ?exit 0
?? ?else ? ?
?? ??? ?echo "`date +\"%F %T\"` [ERROR] impala:refresh ${hive_db}.${hive_tab} failure!"
?? ??? ?exit 5
?? ?fi?? ?
?? ?echo '********************************************************** impala refresh table end *****************************************************************************************'?? ?
}

impTabRefrsh?
?

總結

以上是生活随笔為你收集整理的oracle表增量同步到hive分区表的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。