java kettle6_Java调用Kettle6的transaction和job
在進行復(fù)雜數(shù)據(jù)傳輸時,特別是異構(gòu)數(shù)據(jù)庫的多表數(shù)據(jù)傳輸,我們經(jīng)常會用到ETL工具來完成。Kettle是一個典型的ETL工具且使用廣泛。由于Kettle功能強大且復(fù)雜,對于java開發(fā)人員來說無疑增加了項目運維的難度和復(fù)雜度。因此將Kettle的集成到Java項目中可以大大降低項目的開發(fā)難度和開發(fā)效率,同時也降低了運維復(fù)雜度。網(wǎng)上大多關(guān)于Kettle集成的中文資料都是基于Kettle4.0之前的。以下是根據(jù)6.0官方文檔及網(wǎng)上相關(guān)資料開發(fā)的幾個Demo如有錯誤還望及時指出!
1、jar包引用
文件安裝目錄的data-integration\lib文件夾下有很多jar包,可以根據(jù)實際需要進行添加。經(jīng)過個人測試以下幾個是必要的:
avalon-framework-4.1.5.jar;
commons-codec-1.9.jar;
commons-collections-3.2.1.jar;
commons-io-2.1.jar;
commons-lang-2.5.jar;
commons-logging-1.1.3.jar;
commons-vfs2-2.1-20150824.jar;
guava-17.0.jar;
jug-lgpl-2.0.0.jar;
kettle-core-6.0.1.0-386.jar;
kettle-dbdialog-6.0.1.0-386.jar;
kettle-engine-6.0.1.0-386.jar;
kettle-ui-swt-6.0.1.0-386.jar;
metastore-6.0.1.0-386.jar;
ognl-2.6.9.jar;
scannotation-1.0.2.jar
maven:
pentaho-releases
http://repository.pentaho.org/artifactory/repo/
pentaho-kettle
kettle-core
6.1.0.4-225
com.verhas
license3j
1.0.7
pentaho-kettle
kettle-dbdialog
6.1.0.4-225
pentaho-kettle
kettle-engine
6.1.0.4-225
pentaho
metastore
6.1.0.4-225
org.safehaus.jug
jug
2.0.0
lgpl
2、Java創(chuàng)建transaction
/**
* Creates a new Transformation using input parameters such as the tablename to read from.
* @param transformationName transformation的名稱
* @param sourceDatabaseName 輸入的 database 名稱
* @param sourceTableName 要讀取的表名
* @param sourceFields 要讀取的列名
* @param targetDatabaseName 目標(biāo)database名
* @param targetTableName要寫入的表名
* @param targetFields要寫入的列名(要跟讀取的列長度相同)
* @return A new transformation metadata object
* @throws KettleException In the rare case something goes wrong
*/
public static final TransMeta buildCopyTable(String transformationName,
String sourceDatabaseName, String sourceTableName,
String[] sourceFields, String targetDatabaseName,
String targetTableName, String[] targetFields,
DatabaseMeta[] databases)
throws KettleException {
EnvUtil.environmentInit();
try
{
// Create a new transformation...
TransMeta transMeta = new TransMeta();
transMeta.setName(transformationName);
// 添加數(shù)據(jù)庫連接
for (int i = 0; i < databases.length; i++) {
DatabaseMeta databaseMeta = databases[i];
transMeta.addDatabase(databaseMeta);
}
DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);
//添加注釋
String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]";
NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
transMeta.addNote(ni);
// 創(chuàng)建讀取數(shù)據(jù)源的 step...
String fromstepname = "read from [" + sourceTableName + "]";
TableInputMeta tii = new TableInputMeta();
tii.setDatabaseMeta(sourceDBInfo);
String selectSQL = "SELECT " + Const.CR;
for (int i = 0; i < sourceFields.length; i++) {
if (i > 0) selectSQL += ", "; else selectSQL += " ";
selectSQL += sourceFields[i] + Const.CR;
}
selectSQL += "FROM " + sourceTableName;
tii.setSQL(selectSQL);
PluginRegistry registry = PluginRegistry.getInstance();
String fromstepid = registry.getPluginId(tii);
StepMeta fromstep = new StepMeta(fromstepid, fromstepname,(StepMetaInterface) tii);
fromstep.setLocation(150, 100);
fromstep.setDraw(true);
fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
transMeta.addStep(fromstep);
// 添加 重命名 fields的邏輯
// Use metadata logic in SelectValues, use SelectValueInfo...
SelectValuesMeta svi = new SelectValuesMeta();
svi.allocate(0, 0, sourceFields.length);
for (int i = 0; i < sourceFields.length; i++) {
svi.getSelectName()[i] = sourceFields[i];
svi.getSelectRename()[i] = targetFields[i];
}
String selstepname = "Rename field names";
String selstepid = registry.getPluginId(svi);
StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi);
selstep.setLocation(350, 100);
selstep.setDraw(true);
selstep.setDescription("Rename field names");
transMeta.addStep(selstep);
TransHopMeta shi = new TransHopMeta(fromstep, selstep);
transMeta.addTransHop(shi);
fromstep = selstep;
// 創(chuàng)建 寫數(shù)據(jù)的 step...
// 添加 輸出表 step...
String tostepname = "write to [" + targetTableName + "]";
TableOutputMeta toi = new TableOutputMeta();
toi.setDatabaseMeta(targetDBInfo);
toi.setTablename(targetTableName);
toi.setCommitSize(200);
toi.setTruncateTable(true);
String tostepid = registry.getPluginId(toi);
StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi);
tostep.setLocation(550, 100);
tostep.setDraw(true);
tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
transMeta.addStep(tostep);
// 添加連線...
TransHopMeta hi = new TransHopMeta(fromstep, tostep);
transMeta.addTransHop(hi);
// The transformation is complete, return it...
return transMeta;
} catch (Exception e) {
throw new KettleException("An unexpected error occurred creating the new transformation", e);
}
}
3、Java運行Kettle的transaction:
/**
* 運行轉(zhuǎn)換文件方法
* @param params 多個參數(shù)變量值
* @param ktrPath 轉(zhuǎn)換文件的路徑,后綴ktr
*/
public static void runTransfer(String[] params, String ktrPath) {
Trans trans = null;
try {
// 初始化
// 轉(zhuǎn)換元對象
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrPath);
// 轉(zhuǎn)換
trans = new Trans(transMeta);
// 執(zhí)行轉(zhuǎn)換
trans.execute(params);
// 等待轉(zhuǎn)換執(zhí)行結(jié)束
trans.waitUntilFinished();
// 拋出異常
if (trans.getErrors() > 0) {
throw new Exception(
"There are errors during transformation exception!(傳輸過程中發(fā)生異常)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
4、Java運行Kettle的Job:
/**
* java 調(diào)用 kettle 的job
*
* @paramjobPath
*
*/
public static void runJob(String[] params, String jobPath) {
try {
KettleEnvironment.init();
//jobPath是Job腳本的路徑及名稱
JobMeta jobMeta = new JobMeta(jobPath, null);
Job job = new Job(null, jobMeta);
// 向Job 腳本傳遞參數(shù),腳本中獲取參數(shù)值:${參數(shù)名}
// job.setVariable(paraname, paravalue);
job.setVariable("id", params[0]);
job.setVariable("dt", params[1]);
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception(
"There are errors during job exception!(執(zhí)行job發(fā)生異常)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
注:
1、在Kettle連接SqlServer數(shù)據(jù)庫時建議使用開源的jtds數(shù)據(jù)庫jar包,微軟官方j(luò)ar包不受支持。
2、個人建議使用項目中的調(diào)度框架(如quartz、Spring的schedule等)調(diào)用transaction來實現(xiàn)定時執(zhí)行,可以更靈活的控制我們的Job。
3、Kettle有強大的圖形化設(shè)計器,transaction的創(chuàng)建建議在Kettle中進行。
順便附上實現(xiàn)后的系統(tǒng)界面樣例
clipboard.png
總結(jié)
以上是生活随笔為你收集整理的java kettle6_Java调用Kettle6的transaction和job的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信用卡能多办几张吗 一般2到3张就可以
- 下一篇: java 类爆炸_Java 封装与类