日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

oracle转sparksql工具化,不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle...

發布時間:2025/3/11 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 oracle转sparksql工具化,不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

以前都是使用Sqoop來完成數據從生成的hdfs數據存儲上來抽取至oracle的數據庫:sqoop抽取語句:

sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用戶名 --password 密碼 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;

由于項目需求我們現在要完成在代碼中省城所需字段之后,直接回寫到oracle中,因為數據量每天都很大,用實例或者List存有很大的局限性,可能會出現內存異常等不可預料的東西,所以我通過緩存器機制來存儲數據,然后進行生成結果的臨時表直接回寫(后面做的hbase接口封裝批量提交也比較類似)

廢話不多說直接上代碼:

1、建立緩存實體

package usi.java.oracle;

/**

@author HK

@date 2011-2-15 下午06:45:57

*/

public class Cache {

private String key;

private Object value;

private long timeOut;

private boolean expired;

public Cache() {

super();

}

public Cache(String key, String value, long timeOut, boolean expired) {

this.key = key;

this.value = value;

this.timeOut = timeOut;

this.expired = expired;

}

public String getKey() {

return key;

}

public long getTimeOut() {

return timeOut;

}

public Object getValue() {

return value;

}

public void setKey(String string) {

key = string;

}

public void setTimeOut(long l) {

timeOut = l;

}

public void setValue(Object object) {

value = object;

}

public boolean isExpired() {

return expired;

}

public void setExpired(boolean b) {

expired = b;

}

}

2、建立緩存控制器

package usi.java.oracle;

import java.util.Date;

import java.util.HashMap;

/**

@author HK

@date 2011-2-15 下午09:40:00

*/

public class CacheManager {

private static HashMap cacheMap = new HashMap();

/**

This class is singleton so private constructor is used.

*/

private CacheManager() {

super();

}

/**

returns cache item from hashmap

@param key

@return Cache

*/

private synchronized static Cache getCache(String key) {

return (Cache)cacheMap.get(key);

}

/**

Looks at the hashmap if a cache item exists or not

@param key

@return Cache

*/

private synchronized static boolean hasCache(String key) {

return cacheMap.containsKey(key);

}

/**

Invalidates all cache

*/

public synchronized static void invalidateAll() {

cacheMap.clear();

}

/**

Invalidates a single cache item

@param key

*/

public synchronized static void invalidate(String key) {

cacheMap.remove(key);

}

/**

Adds new item to cache hashmap

@param key

@return Cache

*/

private synchronized static void putCache(String key, Cache object) {

cacheMap.put(key, object);

}

/**

Reads a cache item's content

@param key

@return

*/

public static Cache getContent(String key) {

if (hasCache(key)) {

Cache cache = getCache(key);

if (cacheExpired(cache)) {

cache.setExpired(true);

}

return cache;

} else {

return null;

}

}

/**

@param key

@param content

@param ttl

*/

public static void putContent(String key, Object content, long ttl) {

Cache cache = new Cache();

cache.setKey(key);

cache.setValue(content);

cache.setTimeOut(ttl + new Date().getTime());

cache.setExpired(false);

putCache(key, cache);

}

/*@modelguid {172828D6-3AB2-46C4-96E2-E72B34264031}/

private static boolean cacheExpired(Cache cache) {

if (cache == null) {

return false;

}

long milisNow = new Date().getTime();

long milisExpire = cache.getTimeOut();

if (milisExpire < 0) { // Cache never expires

return false;

} else if (milisNow >= milisExpire) {

return true;

} else {

return false;

}

}

}

3、建立需要導出數據對象

package usi.java.oracle;

public class TaskAll {

private String mme_eid;

private String mme_editor;

private String entitytype_eid;

private String project_eid;

private String resource_eid;

public String getMme_eid() {

return mme_eid;

}

public void setMme_eid(String mme_eid) {

this.mme_eid = mme_eid;

}

public String getMme_editor() {

return mme_editor;

}

public void setMme_editor(String mme_editor) {

this.mme_editor = mme_editor;

}

public String getEntitytype_eid() {

return entitytype_eid;

}

public void setEntitytype_eid(String entitytype_eid) {

this.entitytype_eid = entitytype_eid;

}

public String getProject_eid() {

return project_eid;

}

public void setProject_eid(String project_eid) {

this.project_eid = project_eid;

}

public String getResource_eid() {

return resource_eid;

}

public void setResource_eid(String resource_eid) {

this.resource_eid = resource_eid;

}

}

5、執行邏輯主體,回寫數據,批量提交

package usi.java.oracle;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

//import java.sql.ResultSet;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.hive.HiveContext;

public class redict_to_171ora {

public static void main(String[] args) {

SparkConf sc = new SparkConf().setAppName("redict_to_171ora");

SparkContext jsc = new SparkContext(sc);

HiveContext hc = new HiveContext(jsc);

String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";

DataFrame redict_to_171ora= hc.sql(hivesql1);

//redict_to_171ora.registerTempTable("hivesql1");

List collect=redict_to_171ora.javaRDD().collect();

int o=0;

for (Row lists: collect){

TaskAll task=new TaskAll();

task.setMme_eid(lists.getString(0));

task.setMme_editor(lists.getString(1));

task.setEntitytype_eid(lists.getString(2));

task.setProject_eid(lists.getString(3));

task.setResource_eid(lists.getString(4));

CacheManager.putContent(o+"", task, 30000000);

o++;

/* System.out.println(lists.size());

System.out.println(lists.getString(0));

System.out.println(lists.getString(1));

System.out.println(lists.getString(2));

System.out.println(lists.getString(3));

System.out.println(lists.getString(4));*/

}

System.out.println(o);

Connection con = null;// 創建一個數據庫連接

PreparedStatement pre = null;// 創建預編譯語句對象,一般都是用這個而不用Statement

//ResultSet result = null;// 創建一個結果集對象

try

{

Class.forName("oracle.jdbc.driver.OracleDriver");// 加載Oracle驅動程序

System.out.println("開始嘗試連接數據庫!");

String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本機地址,XE是精簡版Oracle的默認數據庫名

String user = "user";// 用戶名,系統默認的賬戶名

String password = "password";// 你安裝時選設置的密碼

con = DriverManager.getConnection(url, user, password);// 獲取連接

System.out.println("連接成功!");

String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 預編譯語句,“?”代表參數

pre = con.prepareStatement(sql);// 實例化預編譯語句

for(int i=0;i

// for (Row lists: collect){

// String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values('"+task.getMme_eid()+"','"+task.getMme_editor()+"','"+task.getEntitytype_eid()+"','"+task.getProject_eid()+"','"+task.getResource_eid()+"')";// 預編譯語句,“?”代表參數

// pre.setString(1, "三星");// 設置參數,前面的1表示參數的索引,而不是表中列名的索引

TaskAll task=(TaskAll) CacheManager.getContent(""+i).getValue();

pre.setString(1, task.getMme_eid());

pre.setString(2, task.getMme_editor());

pre.setString(3, task.getEntitytype_eid());

pre.setString(4, task.getProject_eid());

pre.setString(5, task.getResource_eid());

pre.addBatch();

if(i%20000==0){//可以設置不同的大小;如50,100,500,1000等等

pre.executeBatch();

con.commit();

pre.clearBatch();

// System.out.println("i的值"+i);

}

// result = pre.executeQuery();// 執行查詢,注意括號中不需要再加參數

}

pre.executeBatch();

con.commit();

pre.clearBatch();

// System.out.println("i的值"+i);

/* if (result != null)

result.close();*/

if (pre != null)

pre.close();

/* while (result.next())

// 當結果集不為空時

System.out.println("usernum:" + result.getString("usernum") + "flow:"

+ result.getString("flow"));*/

}

catch (Exception e)

{

e.printStackTrace();

}

finally

{

try

{

// 逐一將上面的幾個對象關閉,因為不關閉的話會影響性能、并且占用資源

// 注意關閉的順序,最后使用的最先關閉

/* if (result != null)

result.close();*/

if (pre != null)

pre.close();

if (con != null)

con.close();

//System.out.println("數據庫連接已關閉!");

}

catch (Exception e)

{

e.printStackTrace();

}

}

}

}

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的oracle转sparksql工具化,不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 免费av视屏 | 日韩av手机在线免费观看 | 黄色精品视频 | 免费特级黄色片 | 国产又爽又色 | 久久精品99国产精品日本 | caoporn免费在线视频 | 午夜精品久久久久久久99热浪潮 | 国产毛片精品国产一区二区三区 | 日本少妇裸体做爰 | 色播一区二区 | 激情视频在线播放 | 天天干精品 | 成年人黄色在线观看 | 国产中文字幕在线 | 久久成人国产精品入口 | 亚洲精品中文字幕在线播放 | 日本丰满少妇做爰爽爽 | 亚洲精品久久久久久无码色欲四季 | 亚洲精品在线免费观看视频 | 中国免费毛片 | 老女人性生活视频 | 日韩三级在线免费观看 | 可以看的av网站 | 国产91精品久久久 | 国产一级爽片 | 午夜影院男女 | jizzzxxxx | 东京热无码av一区二区 | 久久这里只有精品99 | 精品一区二区三区毛片 | 亚洲高清网站 | 91黄色免费观看 | 熟妇无码乱子成人精品 | 婷婷亚洲天堂 | 热久久国产精品 | 在线香蕉视频 | 免费看国产曰批40分钟粉红裤头 | 亚洲大逼 | 天海翼一区二区 | 国产网友自拍视频 | 懂色av蜜臀av粉嫩av分享吧 | 欧美黄一级 | 久久电影一区二区 | 日韩精品短片 | 精品一区二区三区久久久 | 成人影视在线看 | 欧美日韩国产一级 | 国产在线观看中文字幕 | 老司机午夜影院 | 69精品人人 | 色啦啦视频 | 大尺度做爰呻吟62集 | 亚洲人在线观看视频 | 黄色录像毛片 | 女人高潮娇喘声mp3 乱色视频 | 午夜亚洲AV永久无码精品蜜芽 | 亚洲国产网站 | 日韩激情一区二区三区 | 免费的黄色一级片 | 伊人久久大香 | 亚洲视频大全 | 日本一区视频在线观看 | 丁香花高清在线 | 人体裸体bbb欣赏 | 色天堂在线视频 | 影音先锋国产资源 | 免费a大片| 国产精品久久久久久久久久久不卡 | 欧美三级视频在线观看 | 人人超碰在线 | 一区二区三区av | 日韩精品网站 | 人妻体体内射精一区二区 | 国产精品无码一区二区三区免费 | 日本人添下边视频免费 | 国产精品18久久久 | 亚洲欧美中文日韩在线观看 | 97精品人妻一区二区三区香蕉 | 欧美日本韩国一区二区 | 国产精品久久久久久三级 | 亚洲爱爱av | av不卡在线看 | 国产精彩视频在线观看 | 韩国伦理中文字幕 | 少妇99 | 90岁老太婆乱淫 | 欧美一区二区网站 | 成人污在线| 欧美xxxx×黑人性爽 | 日本乱子伦xxxx | 亚洲中文字幕无码不卡电影 | 欧美国产一区二区在线观看 | 欧美一区二区三区久久精品 | 住在隔壁的她动漫免费观看全集下载 | 国产精品久久久久久中文字 | 91网站免费在线观看 | 国产成人无码精品久久久久 | 内射后入在线观看一区 |