日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java kettle6_Java调用Kettle6的transaction和job

發布時間:2024/10/8 java 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kettle6_Java调用Kettle6的transaction和job 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在進行復雜數據傳輸時,特別是異構數據庫的多表數據傳輸,我們經常會用到ETL工具來完成。Kettle是一個典型的ETL工具且使用廣泛。由于Kettle功能強大且復雜,對于java開發人員來說無疑增加了項目運維的難度和復雜度。因此將Kettle的集成到Java項目中可以大大降低項目的開發難度和開發效率,同時也降低了運維復雜度。網上大多關于Kettle集成的中文資料都是基于Kettle4.0之前的。以下是根據6.0官方文檔及網上相關資料開發的幾個Demo如有錯誤還望及時指出!

1、jar包引用

文件安裝目錄的data-integration\lib文件夾下有很多jar包,可以根據實際需要進行添加。經過個人測試以下幾個是必要的:

avalon-framework-4.1.5.jar;

commons-codec-1.9.jar;

commons-collections-3.2.1.jar;

commons-io-2.1.jar;

commons-lang-2.5.jar;

commons-logging-1.1.3.jar;

commons-vfs2-2.1-20150824.jar;

guava-17.0.jar;

jug-lgpl-2.0.0.jar;

kettle-core-6.0.1.0-386.jar;

kettle-dbdialog-6.0.1.0-386.jar;

kettle-engine-6.0.1.0-386.jar;

kettle-ui-swt-6.0.1.0-386.jar;

metastore-6.0.1.0-386.jar;

ognl-2.6.9.jar;

scannotation-1.0.2.jar

maven:

pentaho-releases

http://repository.pentaho.org/artifactory/repo/

pentaho-kettle

kettle-core

6.1.0.4-225

com.verhas

license3j

1.0.7

pentaho-kettle

kettle-dbdialog

6.1.0.4-225

pentaho-kettle

kettle-engine

6.1.0.4-225

pentaho

metastore

6.1.0.4-225

org.safehaus.jug

jug

2.0.0

lgpl

2、Java創建transaction

/**

* Creates a new Transformation using input parameters such as the tablename to read from.

* @param transformationName transformation的名稱

* @param sourceDatabaseName 輸入的 database 名稱

* @param sourceTableName 要讀取的表名

* @param sourceFields 要讀取的列名

* @param targetDatabaseName 目標database名

* @param targetTableName要寫入的表名

* @param targetFields要寫入的列名(要跟讀取的列長度相同)

* @return A new transformation metadata object

* @throws KettleException In the rare case something goes wrong

*/

public static final TransMeta buildCopyTable(String transformationName,

String sourceDatabaseName, String sourceTableName,

String[] sourceFields, String targetDatabaseName,

String targetTableName, String[] targetFields,

DatabaseMeta[] databases)

throws KettleException {

EnvUtil.environmentInit();

try

{

// Create a new transformation...

TransMeta transMeta = new TransMeta();

transMeta.setName(transformationName);

// 添加數據庫連接

for (int i = 0; i < databases.length; i++) {

DatabaseMeta databaseMeta = databases[i];

transMeta.addDatabase(databaseMeta);

}

DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);

DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);

//添加注釋

String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;

note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]";

NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);

transMeta.addNote(ni);

// 創建讀取數據源的 step...

String fromstepname = "read from [" + sourceTableName + "]";

TableInputMeta tii = new TableInputMeta();

tii.setDatabaseMeta(sourceDBInfo);

String selectSQL = "SELECT " + Const.CR;

for (int i = 0; i < sourceFields.length; i++) {

if (i > 0) selectSQL += ", "; else selectSQL += " ";

selectSQL += sourceFields[i] + Const.CR;

}

selectSQL += "FROM " + sourceTableName;

tii.setSQL(selectSQL);

PluginRegistry registry = PluginRegistry.getInstance();

String fromstepid = registry.getPluginId(tii);

StepMeta fromstep = new StepMeta(fromstepid, fromstepname,(StepMetaInterface) tii);

fromstep.setLocation(150, 100);

fromstep.setDraw(true);

fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");

transMeta.addStep(fromstep);

// 添加 重命名 fields的邏輯

// Use metadata logic in SelectValues, use SelectValueInfo...

SelectValuesMeta svi = new SelectValuesMeta();

svi.allocate(0, 0, sourceFields.length);

for (int i = 0; i < sourceFields.length; i++) {

svi.getSelectName()[i] = sourceFields[i];

svi.getSelectRename()[i] = targetFields[i];

}

String selstepname = "Rename field names";

String selstepid = registry.getPluginId(svi);

StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi);

selstep.setLocation(350, 100);

selstep.setDraw(true);

selstep.setDescription("Rename field names");

transMeta.addStep(selstep);

TransHopMeta shi = new TransHopMeta(fromstep, selstep);

transMeta.addTransHop(shi);

fromstep = selstep;

// 創建 寫數據的 step...

// 添加 輸出表 step...

String tostepname = "write to [" + targetTableName + "]";

TableOutputMeta toi = new TableOutputMeta();

toi.setDatabaseMeta(targetDBInfo);

toi.setTablename(targetTableName);

toi.setCommitSize(200);

toi.setTruncateTable(true);

String tostepid = registry.getPluginId(toi);

StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi);

tostep.setLocation(550, 100);

tostep.setDraw(true);

tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");

transMeta.addStep(tostep);

// 添加連線...

TransHopMeta hi = new TransHopMeta(fromstep, tostep);

transMeta.addTransHop(hi);

// The transformation is complete, return it...

return transMeta;

} catch (Exception e) {

throw new KettleException("An unexpected error occurred creating the new transformation", e);

}

}

3、Java運行Kettle的transaction:

/**

* 運行轉換文件方法

* @param params 多個參數變量值

* @param ktrPath 轉換文件的路徑,后綴ktr

*/

public static void runTransfer(String[] params, String ktrPath) {

Trans trans = null;

try {

// 初始化

// 轉換元對象

KettleEnvironment.init();

EnvUtil.environmentInit();

TransMeta transMeta = new TransMeta(ktrPath);

// 轉換

trans = new Trans(transMeta);

// 執行轉換

trans.execute(params);

// 等待轉換執行結束

trans.waitUntilFinished();

// 拋出異常

if (trans.getErrors() > 0) {

throw new Exception(

"There are errors during transformation exception!(傳輸過程中發生異常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

4、Java運行Kettle的Job:

/**

* java 調用 kettle 的job

*

* @paramjobPath

*

*/

public static void runJob(String[] params, String jobPath) {

try {

KettleEnvironment.init();

//jobPath是Job腳本的路徑及名稱

JobMeta jobMeta = new JobMeta(jobPath, null);

Job job = new Job(null, jobMeta);

// 向Job 腳本傳遞參數,腳本中獲取參數值:${參數名}

// job.setVariable(paraname, paravalue);

job.setVariable("id", params[0]);

job.setVariable("dt", params[1]);

job.start();

job.waitUntilFinished();

if (job.getErrors() > 0) {

throw new Exception(

"There are errors during job exception!(執行job發生異常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

注:

1、在Kettle連接SqlServer數據庫時建議使用開源的jtds數據庫jar包,微軟官方jar包不受支持。

2、個人建議使用項目中的調度框架(如quartz、Spring的schedule等)調用transaction來實現定時執行,可以更靈活的控制我們的Job。

3、Kettle有強大的圖形化設計器,transaction的創建建議在Kettle中進行。

順便附上實現后的系統界面樣例

clipboard.png

總結

以上是生活随笔為你收集整理的java kettle6_Java调用Kettle6的transaction和job的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。