日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kettle连接不上es7_kettle8.2连接ElasticSearch7

發布時間:2025/4/5 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kettle连接不上es7_kettle8.2连接ElasticSearch7 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

成品插件下載地址:https://download.csdn.net/download/wyazyf/11286050

一、下載

1、下載kettle8.2的發布版源碼包,具體版本為8.2,地址:https://github.com/pentaho/pentaho-kettle/releases?after=7.1.0.23-R。具體位置如下圖

2、下載kettle8.2發布版工具,地址:https://community.hitachivantara.com/docs/DOC-1009855,具體位置如下圖

二、修改

1、解壓源碼包,名稱為pentaho-kettle-8.2.0.0-R

2、進入pentaho-kettle-8.2.0.0-R目錄,修改pom文件內容。(解決編譯時部分包找不到問題)

pentaho-public

Pentaho Public

https://nexus.pentaho.org/repository/omni/

true

daily

true

interval:15

pentaho-public1

/

http://oss.sonatype.org/content/groups/public/

pentaho-public2

/

https://nexus.pentaho.org/repository/proxy-public-release/

pentaho-public3

/

https://nexus.pentaho.org/repository/proxy-public-snapshot/

3、進入pentaho-kettle-8.2.0.0-R\plugins目錄,修改pom文件內容。(加快編譯速度)

highdeps

!skipDefault

elasticsearch-bulk-insert

4、進入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core目錄,修改pom文件內容。(添加es7連接客戶端,注釋transport連接客戶端,修改es的版本號)

8.2.0.0-342

${project.version}

${maven.build.timestamp}

${project.description}

yyyy/MM/dd hh:mm

7.2.0

org.elasticsearch.client

elasticsearch-rest-high-level-client

7.2.0

org.elasticsearch

elasticsearch

${elasticsearch.version}

compile

5、進入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core\src\main\java\org\pentaho\di\trans\steps\elasticsearchbulk目錄,修改ElasticSearchBulk.java文件內容,如下

package org.pentaho.di.trans.steps.elasticsearchbulk;

import java.io.IOException;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.Map;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.elasticsearch.action.DocWriteRequest.OpType;

import org.elasticsearch.action.bulk.BulkItemResponse;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import org.elasticsearch.common.xcontent.XContentType;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.exception.KettleStepException;

import org.pentaho.di.core.row.RowDataUtil;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.core.row.ValueMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStep;

import org.pentaho.di.trans.step.StepDataInterface;

import org.pentaho.di.trans.step.StepInterface;

import org.pentaho.di.trans.step.StepMeta;

import org.pentaho.di.trans.step.StepMetaInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

/**

* Does bulk insert of data into ElasticSearch

*

* @author webdetails

* @since 16-02-2011

*/

public class ElasticSearchBulk extends BaseStep implements StepInterface {

private static final String INSERT_ERROR_CODE = null;

private static Class> PKG = ElasticSearchBulkMeta.class; // for i18n

private ElasticSearchBulkMeta meta;

private ElasticSearchBulkData data;

//? private Client client;

private RestHighLevelClient client;

private String index;

private String type;

//? BulkRequestBuilder currentRequest;

BulkRequest currentRequest = new BulkRequest();

private int batchSize = 2;

private boolean isJsonInsert = false;

private int jsonFieldIdx = 0;

private String idOutFieldName = null;

private Integer idFieldIndex = null;

private Long timeout = null;

private TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;

private int numberOfErrors = 0;

//? private ListrequestsBuffer;

private ListrequestsBuffer;

private boolean stopOnError = true;

private boolean useOutput = true;

private MapcolumnsToJson;

private boolean hasFields;

private IndexRequest.OpType opType = org.elasticsearch.action.DocWriteRequest.OpType.CREATE;

public ElasticSearchBulk( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,

Trans trans ) {

super( stepMeta, stepDataInterface, copyNr, transMeta, trans );

}

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {

Object[] rowData = getRow();

if ( rowData == null ) {

if ( currentRequest != null && currentRequest.numberOfActions() > 0 ) {

processBatch( false );

}

setOutputDone();

return false;

}

if ( first ) {

first = false;

setupData();

//????? currentRequest = client.prepareBulk();

//????? requestsBuffer = new ArrayList( this.batchSize );

//?? try {

// ???????? ??client.bulk(currentRequest, RequestOptions.DEFAULT);

// } catch (IOException e1) {

// ??? rejectAllRows( e1.getLocalizedMessage() );

//?????? String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

//?????? logError( msg );

//?????? throw new KettleStepException( msg, e1 );

// }

requestsBuffer = new ArrayList( this.batchSize );

initFieldIndexes();

}

try {

data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;

return indexRow( data.inputRowMeta, rowData ) || !stopOnError;

} catch ( KettleStepException e ) {

throw e;

} catch ( Exception e ) {

rejectAllRows( e.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e );

}

}

/**

* Initialize this.data

*

* @throws KettleStepException

*/

private void setupData() throws KettleStepException {

data.nextBufferRowIdx = 0;

data.inputRowMeta = getInputRowMeta().clone(); // only available after first getRow();

data.inputRowBuffer = new Object[batchSize][];

data.outputRowMeta = data.inputRowMeta.clone();

meta.getFields( data.outputRowMeta, getStepname(), null, null, this, repository, metaStore );

}

private void initFieldIndexes() throws KettleStepException {

if ( isJsonInsert ) {

Integer idx = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getJsonField() ) );

if ( idx != null ) {

jsonFieldIdx = idx.intValue();

} else {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoJsonField" ) );

}

}

idOutFieldName = environmentSubstitute( meta.getIdOutField() );

if ( StringUtils.isNotBlank( meta.getIdInField() ) ) {

idFieldIndex = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getIdInField() ) );

if ( idFieldIndex == null ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.InvalidIdField" ) );

}

} else {

idFieldIndex = null;

}

}

private static Integer getFieldIdx( RowMetaInterface rowMeta, String fieldName ) {

if ( fieldName == null ) {

return null;

}

for ( int i = 0; i < rowMeta.size(); i++ ) {

String name = rowMeta.getValueMeta( i ).getName();

if ( fieldName.equals( name ) ) {

return i;

}

}

return null;

}

/**

* @param rowMeta The metadata for the row to be indexed

* @param row???? The data for the row to be indexed

*/

private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {

try {

//????? IndexRequestBuilder requestBuilder = client.prepareIndex( index, type );

//??? ? requestBuilder.setOpType( this.opType );

IndexRequest indexRequest = new IndexRequest(index);

indexRequest.type(type);

indexRequest.opType(this.opType);

if ( idFieldIndex != null ) {

//??????? requestBuilder.setId( "" + row[idFieldIndex] ); // "" just in case field isn't string

indexRequest.id("" + row[idFieldIndex]);

}

if ( isJsonInsert ) {

//??????? addSourceFromJsonString( row, requestBuilder );

addSourceFromJsonString( row, indexRequest );

} else {

//??????? addSourceFromRowFields( requestBuilder, rowMeta, row );

addSourceFromRowFields( indexRequest, rowMeta, row );

}

// currentRequest = new BulkRequest();

//????? currentRequest.add( requestBuilder );

//????? requestsBuffer.add( requestBuilder );

currentRequest.add( indexRequest );

requestsBuffer.add( indexRequest );

if ( currentRequest.numberOfActions() >= batchSize ) {

return processBatch( true );

} else {

return true;

}

} catch ( KettleStepException e ) {

throw e;

} catch ( NoNodeAvailableException e ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

} catch ( Exception e ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e

.getLocalizedMessage() ), e );

}

}

//? /**

//?? * @param row

//?? * @param requestBuilder

//?? */

//? private void addSourceFromJsonString( Object[] row, IndexRequestBuilder requestBuilder ) throws KettleStepException {

//??? Object jsonString = row[jsonFieldIdx];

//??? if ( jsonString instanceof byte[] ) {

//????? requestBuilder.setSource( (byte[]) jsonString, XContentType.JSON );

//??? } else if ( jsonString instanceof String ) {

//????? requestBuilder.setSource( (String) jsonString, XContentType.JSON );

//??? } else {

//????? throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

//??? }

//? }

/**

* @param row

* @param IndexRequest

*/

private void addSourceFromJsonString( Object[] row, IndexRequest indexRequest ) throws KettleStepException {

Object jsonString = row[jsonFieldIdx];

if ( jsonString instanceof byte[] ) {

indexRequest.source( (byte[]) jsonString, XContentType.JSON );

} else if ( jsonString instanceof String ) {

indexRequest.source( (String) jsonString, XContentType.JSON );

} else {

throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

}

}

//? /**

//?? * @param requestBuilder

//?? * @param rowMeta

//?? * @param row

//?? * @throws IOException

//?? */

//? private void addSourceFromRowFields( IndexRequestBuilder requestBuilder, RowMetaInterface rowMeta, Object[] row )

//????????? throws IOException {

//??? XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

//

//??? for ( int i = 0; i < rowMeta.size(); i++ ) {

//????? if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

//??????? continue;

//????? }

//

//????? ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

//????? String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

//????? Object value = row[i];

//????? if ( value instanceof Date && value.getClass() != Date.class ) {

//??????? Date subDate = (Date) value;

//??????? // create a genuine Date object, or jsonBuilder will not recognize it

//??????? value = new Date( subDate.getTime() );

//????? }

//????? if ( StringUtils.isNotBlank( name ) ) {

//??????? jsonBuilder.field( name, value );

//????? }

//??? }

//

//??? jsonBuilder.endObject();

//??? requestBuilder.setSource( jsonBuilder );

//? }

/**

* @param requestBuilder

* @param rowMeta

* @param row

* @throws IOException

*/

private void addSourceFromRowFields( IndexRequest indexRequest, RowMetaInterface rowMeta, Object[] row )

throws IOException {

XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

for ( int i = 0; i < rowMeta.size(); i++ ) {

if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

continue;

}

ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

Object value = row[i];

if ( value instanceof Date && value.getClass() != Date.class ) {

Date subDate = (Date) value;

// create a genuine Date object, or jsonBuilder will not recognize it

value = new Date( subDate.getTime() );

}

if ( StringUtils.isNotBlank( name ) ) {

jsonBuilder.field( name, value );

}

}

jsonBuilder.endObject();

indexRequest.source( jsonBuilder );

}

public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {

meta = (ElasticSearchBulkMeta) smi;

data = (ElasticSearchBulkData) sdi;

if ( super.init( smi, sdi ) ) {

try {

numberOfErrors = 0;

initFromMeta();

initClient();

return true;

} catch ( Exception e ) {

logError( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.ErrorOccurredDuringStepInitialize" )

+ e.getMessage() );

}

return true;

}

return false;

}

private void initFromMeta() {

index = environmentSubstitute( meta.getIndex() );

type = environmentSubstitute( meta.getType() );

batchSize = meta.getBatchSizeInt( this );

try {

timeout = Long.parseLong( environmentSubstitute( meta.getTimeOut() ) );

} catch ( NumberFormatException e ) {

timeout = null;

}

timeoutUnit = meta.getTimeoutUnit();

isJsonInsert = meta.isJsonInsert();

useOutput = meta.isUseOutput();

stopOnError = meta.isStopOnError();

columnsToJson = meta.getFieldsMap();

this.hasFields = columnsToJson.size() > 0;

this.opType =

StringUtils.isNotBlank( meta.getIdInField() ) && meta.isOverWriteIfSameId() ? OpType.INDEX : OpType.CREATE;

}

private boolean processBatch( boolean makeNew ) throws KettleStepException {

BulkResponse response = null;

//??? ActionFutureactionFuture = currentRequest.execute();

try{

response = client.bulk(currentRequest, RequestOptions.DEFAULT);

} catch (IOException e1) {

rejectAllRows( e1.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e1 );

}

boolean responseOk = false;

//??? try {

//????? if ( timeout != null && timeoutUnit != null ) {

//??????? response = actionFuture.actionGet( timeout, timeoutUnit );

//????? } else {

//??????? response = actionFuture.actionGet();

//????? }

//??? } catch ( ElasticsearchException e ) {

//????? String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.BatchExecuteFail", e.getLocalizedMessage() );

//????? if ( e instanceof ElasticsearchTimeoutException ) {

//??????? msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.Timeout" );

//????? }

//????? logError( msg );

//????? rejectAllRows( msg );

//??? }

if ( response != null ) {

responseOk = handleResponse( response );

requestsBuffer.clear();

} else { // have to assume all failed

numberOfErrors += currentRequest.numberOfActions();

setErrors( numberOfErrors );

}

if ( makeNew ) {

//????? currentRequest = client.prepareBulk();

try{

client.bulk(currentRequest, RequestOptions.DEFAULT);

} catch (IOException e1) {

rejectAllRows( e1.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e1 );

}

data.nextBufferRowIdx = 0;

data.inputRowBuffer = new Object[batchSize][];

} else {

currentRequest = null;

data.inputRowBuffer = null;

}

return responseOk;

}

/**

* @param response

* @return true if no errors

*/

private boolean handleResponse( BulkResponse response ) {

boolean hasErrors = response.hasFailures();

if ( hasErrors ) {

logError( response.buildFailureMessage() );

}

int errorsInBatch = 0;

if ( hasErrors || useOutput ) {

for ( BulkItemResponse item : response ) {

if ( item.isFailed() ) {

// log

logDetailed( item.getFailureMessage() );

errorsInBatch++;

if ( getStepMeta().isDoingErrorHandling() ) {

rejectRow( item.getItemId(), item.getFailureMessage() );

}

} else if ( useOutput ) {

if ( idOutFieldName != null ) {

addIdToRow( item.getId(), item.getItemId() );

}

echoRow( item.getItemId() );

}

}

}

numberOfErrors += errorsInBatch;

setErrors( numberOfErrors );

int linesOK = currentRequest.numberOfActions() - errorsInBatch;

if ( useOutput ) {

setLinesOutput( getLinesOutput() + linesOK );

} else {

setLinesWritten( getLinesWritten() + linesOK );

}

return !hasErrors;

}

private void addIdToRow( String id, int rowIndex ) {

data.inputRowBuffer[rowIndex] =

RowDataUtil.resizeArray( data.inputRowBuffer[rowIndex], getInputRowMeta().size() + 1 );

data.inputRowBuffer[rowIndex][getInputRowMeta().size()] = id;

}

/**

* Send input row to output

*

* @param rowIndex

*/

private void echoRow( int rowIndex ) {

try {

putRow( data.outputRowMeta, data.inputRowBuffer[rowIndex] );

} catch ( KettleStepException e ) {

logError( e.getLocalizedMessage() );

} catch ( ArrayIndexOutOfBoundsException e ) {

logError( e.getLocalizedMessage() );

}

}

/**

* Send input row to error.

*

* @param index

* @param errorMsg

*/

private void rejectRow( int index, String errorMsg ) {

try {

putError( getInputRowMeta(), data.inputRowBuffer[index], 1, errorMsg, null, INSERT_ERROR_CODE );

} catch ( KettleStepException e ) {

logError( e.getLocalizedMessage() );

} catch ( ArrayIndexOutOfBoundsException e ) {

logError( e.getLocalizedMessage() );

}

}

private void rejectAllRows( String errorMsg ) {

for ( int i = 0; i < data.nextBufferRowIdx; i++ ) {

rejectRow( i, errorMsg );

}

}

private void initClient() throws UnknownHostException {

Settings.Builder settingsBuilder = Settings.builder();

settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

meta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(),

environmentSubstitute( s.getValue() ) ) );

//??? PreBuiltTransportClient tClient = new PreBuiltTransportClient( settingsBuilder.build() );

//

//??? for ( Server server : meta.getServers() ) {

//????? tClient.addTransportAddress( new TransportAddress(

//????????????? InetAddress.getByName( environmentSubstitute( server.getAddress() ) ),

//????????????? server.getPort() ) );

//??? }

//

//??? client = tClient;

RestHighLevelClient rclient=null;

for ( Server server : meta.getServers() ) {

rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

}

client = rclient;

/** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,

*? which was removed from the elasticsearch 5.0 API, see:

*? https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_java_api_changes

*? .html#_nodebuilder_removed

*/

}

private void disposeClient() throws IOException{

if ( client != null ) {

client.close();

}

}

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

meta = (ElasticSearchBulkMeta) smi;

data = (ElasticSearchBulkData) sdi;

try {

disposeClient();

} catch ( Exception e ) {

logError( e.getLocalizedMessage(), e );

}

super.dispose( smi, sdi );

}

}

6、進入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core\src\main\java\org\pentaho\di\ui\trans\steps\elasticsearchbulk目錄,修改ElasticSearchBulkDialog.java文件內容,如下

package org.pentaho.di.ui.trans.steps.elasticsearchbulk;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.eclipse.swt.SWT;

import org.eclipse.swt.custom.CTabFolder;

import org.eclipse.swt.custom.CTabItem;

import org.eclipse.swt.events.FocusListener;

import org.eclipse.swt.events.ModifyEvent;

import org.eclipse.swt.events.ModifyListener;

import org.eclipse.swt.events.SelectionAdapter;

import org.eclipse.swt.events.SelectionEvent;

import org.eclipse.swt.events.SelectionListener;

import org.eclipse.swt.events.ShellAdapter;

import org.eclipse.swt.events.ShellEvent;

import org.eclipse.swt.layout.FormAttachment;

import org.eclipse.swt.layout.FormData;

import org.eclipse.swt.layout.FormLayout;

import org.eclipse.swt.widgets.Button;

import org.eclipse.swt.widgets.Composite;

import org.eclipse.swt.widgets.Control;

import org.eclipse.swt.widgets.Display;

import org.eclipse.swt.widgets.Event;

import org.eclipse.swt.widgets.Group;

import org.eclipse.swt.widgets.Label;

import org.eclipse.swt.widgets.Listener;

import org.eclipse.swt.widgets.MessageBox;

import org.eclipse.swt.widgets.Shell;

import org.eclipse.swt.widgets.Text;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.core.MainResponse;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.discovery.MasterNotDiscoveredException;

import org.pentaho.di.core.Const;

import org.pentaho.di.core.Props;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStepMeta;

import org.pentaho.di.trans.step.StepDialogInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

import org.pentaho.di.ui.core.dialog.ErrorDialog;

import org.pentaho.di.ui.core.widget.ColumnInfo;

import org.pentaho.di.ui.core.widget.LabelComboVar;

import org.pentaho.di.ui.core.widget.LabelTextVar;

import org.pentaho.di.ui.core.widget.TableView;

import org.pentaho.di.ui.core.widget.TextVar;

import org.pentaho.di.ui.trans.step.BaseStepDialog;

public class ElasticSearchBulkDialog extends BaseStepDialog implements StepDialogInterface {

private ElasticSearchBulkMeta model;

private static Class> PKG = ElasticSearchBulkMeta.class;

private CTabFolder wTabFolder;

private FormData fdTabFolder;

private CTabItem wGeneralTab;

private Composite wGeneralComp;

private FormData fdGeneralComp;

private Label wlBatchSize;

private TextVar wBatchSize;

private LabelTextVar wIdOutField;

private Group wIndexGroup;

private FormData fdIndexGroup;

private Group wSettingsGroup;

private FormData fdSettingsGroup;

private String[] fieldNames;

private CTabItem wFieldsTab;

private LabelTextVar wIndex;

private LabelTextVar wType;

private ModifyListener lsMod;

private Button wIsJson;

private Label wlIsJson;

private Label wlUseOutput;

private Button wUseOutput;

private LabelComboVar wJsonField;

private TableView wFields;

private CTabItem wServersTab;

private TableView wServers;

private CTabItem wSettingsTab;

private TableView wSettings;

private LabelTimeComposite wTimeOut;

private Label wlStopOnError;

private Button wStopOnError;

private Button wTest;

private Button wTestCl;

private LabelComboVar wIdInField;

private Button wIsOverwrite;

private Label wlIsOverwrite;

public ElasticSearchBulkDialog( Shell parent, Object in, TransMeta transMeta, String sname ) {

super( parent, (BaseStepMeta) in, transMeta, sname );

model = (ElasticSearchBulkMeta) in;

}

public String open() {

Shell parent = getParent();

Display display = parent.getDisplay();

shell = new Shell( parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN );

props.setLook( shell );

setShellImage( shell, model );

lsMod = new ModifyListener() {

public void modifyText( ModifyEvent e ) {

model.setChanged();

}

};

changed = model.hasChanged();

FormLayout formLayout = new FormLayout();

formLayout.marginWidth = Const.FORM_MARGIN;

formLayout.marginHeight = Const.FORM_MARGIN;

shell.setLayout( formLayout );

shell.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.DialogTitle" ) );

int middle = props.getMiddlePct();

int margin = Const.MARGIN;

// Stepname line

wlStepname = new Label( shell, SWT.RIGHT );

wlStepname.setText( BaseMessages.getString( PKG, "System.Label.StepName" ) );

props.setLook( wlStepname );

fdlStepname = new FormData();

fdlStepname.left = new FormAttachment( 0, 0 );

fdlStepname.top = new FormAttachment( 0, margin );

fdlStepname.right = new FormAttachment( middle, -margin );

wlStepname.setLayoutData( fdlStepname );

wStepname = new Text( shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

wStepname.setText( stepname );

props.setLook( wStepname );

wStepname.addModifyListener( lsMod );

fdStepname = new FormData();

fdStepname.left = new FormAttachment( middle, 0 );

fdStepname.top = new FormAttachment( 0, margin );

fdStepname.right = new FormAttachment( 100, 0 );

wStepname.setLayoutData( fdStepname );

wTabFolder = new CTabFolder( shell, SWT.BORDER );

props.setLook( wTabFolder, Props.WIDGET_STYLE_TAB );

// GENERAL TAB

addGeneralTab();

// Servers TAB

addServersTab();

// Fields TAB

addFieldsTab();

// Settings TAB

addSettingsTab();

//

// BUTTONS //

// //

wOK = new Button( shell, SWT.PUSH );

wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );

wCancel = new Button( shell, SWT.PUSH );

wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );

setButtonPositions( new Button[]{wOK, wCancel}, margin, null );

fdTabFolder = new FormData();

fdTabFolder.left = new FormAttachment( 0, 0 );

fdTabFolder.top = new FormAttachment( wStepname, margin );

fdTabFolder.right = new FormAttachment( 100, 0 );

fdTabFolder.bottom = new FormAttachment( wOK, -margin );

wTabFolder.setLayoutData( fdTabFolder );

// //

// Std Listeners //

//

addStandardListeners();

wTabFolder.setSelection( 0 );

// Set the shell size, based upon previous time...

setSize();

getData( model );

model.setChanged( changed );

shell.open();

while ( !shell.isDisposed() ) {

if ( !display.readAndDispatch() ) {

display.sleep();

}

}

return stepname;

}

private void addStandardListeners() {

// Add listeners

lsOK = new Listener() {

public void handleEvent( Event e ) {

ok();

}

};

lsCancel = new Listener() {

public void handleEvent( Event e ) {

cancel();

}

};

lsMod = new ModifyListener() {

public void modifyText( ModifyEvent event ) {

model.setChanged();

}

};

wOK.addListener( SWT.Selection, lsOK );

wCancel.addListener( SWT.Selection, lsCancel );

lsDef = new SelectionAdapter() {

public void widgetDefaultSelected( SelectionEvent e ) {

ok();

}

};

wStepname.addSelectionListener( lsDef );

// window close

shell.addShellListener( new ShellAdapter() {

public void shellClosed( ShellEvent e ) {

cancel();

}

} );

}

/**

*/

private void addGeneralTab() {

wGeneralTab = new CTabItem( wTabFolder, SWT.NONE );

wGeneralTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.General.Tab" ) );

wGeneralComp = new Composite( wTabFolder, SWT.NONE );

props.setLook( wGeneralComp );

FormLayout generalLayout = new FormLayout();

generalLayout.marginWidth = 3;

generalLayout.marginHeight = 3;

wGeneralComp.setLayout( generalLayout );

// Index GROUP

fillIndexGroup( wGeneralComp );

// Options GROUP

fillOptionsGroup( wGeneralComp );

fdGeneralComp = new FormData();

fdGeneralComp.left = new FormAttachment( 0, 0 );

fdGeneralComp.top = new FormAttachment( wStepname, Const.MARGIN );

fdGeneralComp.right = new FormAttachment( 100, 0 );

fdGeneralComp.bottom = new FormAttachment( 100, 0 );

wGeneralComp.setLayoutData( fdGeneralComp );

wGeneralComp.layout();

wGeneralTab.setControl( wGeneralComp );

}

private void fillIndexGroup( Composite parentTab ) {

wIndexGroup = new Group( parentTab, SWT.SHADOW_NONE );

props.setLook( wIndexGroup );

wIndexGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IndexGroup.Label" ) );

FormLayout indexGroupLayout = new FormLayout();

indexGroupLayout.marginWidth = 10;

indexGroupLayout.marginHeight = 10;

wIndexGroup.setLayout( indexGroupLayout );

// Index

wIndex = new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index"

+ ".Label" ), BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index.Tooltip" ) );

wIndex.addModifyListener( lsMod );

// Type

wType =

new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type"

+ ".Label" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type.Tooltip" ) );

wType.addModifyListener( lsMod );

// Test button

wTest = new Button( wIndexGroup, SWT.PUSH );

wTest.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Label" ) );

wTest.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Tooltip" ) );

wTest.addListener( SWT.Selection, new Listener() {

public void handleEvent( Event arg0 ) {

test( TestType.INDEX );

}

} );

Control[] connectionControls = new Control[]{wIndex, wType};

placeControls( wIndexGroup, connectionControls );

BaseStepDialog.positionBottomButtons( wIndexGroup, new Button[]{wTest}, Const.MARGIN, wType );

fdIndexGroup = new FormData();

fdIndexGroup.left = new FormAttachment( 0, Const.MARGIN );

fdIndexGroup.top = new FormAttachment( wStepname, Const.MARGIN );

fdIndexGroup.right = new FormAttachment( 100, -Const.MARGIN );

wIndexGroup.setLayoutData( fdIndexGroup );

}

private void addServersTab() {

wServersTab = new CTabItem( wTabFolder, SWT.NONE );

wServersTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.TabTitle" ) );

FormLayout serversLayout = new FormLayout();

serversLayout.marginWidth = Const.FORM_MARGIN;

serversLayout.marginHeight = Const.FORM_MARGIN;

Composite wServersComp = new Composite( wTabFolder, SWT.NONE );

wServersComp.setLayout( serversLayout );

props.setLook( wServersComp );

// Test button

wTestCl = new Button( wServersComp, SWT.PUSH );

wTestCl.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Label" ) );

wTestCl.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Tooltip" ) );

wTestCl.addListener( SWT.Selection, new Listener() {

public void handleEvent( Event arg0 ) {

test( TestType.CLUSTER );

}

} );

setButtonPositions( new Button[]{wTestCl}, Const.MARGIN, null );

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Address.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[0].setUsingVariables( true );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Port.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, true );

wServers =

new TableView( transMeta, wServersComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

props );

FormData fdServers = new FormData();

fdServers.left = new FormAttachment( 0, Const.MARGIN );

fdServers.top = new FormAttachment( 0, Const.MARGIN );

fdServers.right = new FormAttachment( 100, -Const.MARGIN );

fdServers.bottom = new FormAttachment( wTestCl, -Const.MARGIN );

wServers.setLayoutData( fdServers );

FormData fdServersComp = new FormData();

fdServersComp.left = new FormAttachment( 0, 0 );

fdServersComp.top = new FormAttachment( 0, 0 );

fdServersComp.right = new FormAttachment( 100, 0 );

fdServersComp.bottom = new FormAttachment( 100, 0 );

wServersComp.setLayoutData( fdServersComp );

wServersComp.layout();

wServersTab.setControl( wServersComp );

}

private void addSettingsTab() {

wSettingsTab = new CTabItem( wTabFolder, SWT.NONE );

wSettingsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.TabTitle" ) );

FormLayout serversLayout = new FormLayout();

serversLayout.marginWidth = Const.FORM_MARGIN;

serversLayout.marginHeight = Const.FORM_MARGIN;

Composite wSettingsComp = new Composite( wTabFolder, SWT.NONE );

wSettingsComp.setLayout( serversLayout );

props.setLook( wSettingsComp );

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Property.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Value.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[1].setUsingVariables( true );

wSettings =

new TableView( transMeta, wSettingsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

props );

FormData fdServers = new FormData();

fdServers.left = new FormAttachment( 0, Const.MARGIN );

fdServers.top = new FormAttachment( 0, Const.MARGIN );

fdServers.right = new FormAttachment( 100, -Const.MARGIN );

fdServers.bottom = new FormAttachment( 100, -Const.MARGIN );

wSettings.setLayoutData( fdServers );

FormData fdServersComp = new FormData();

fdServersComp.left = new FormAttachment( 0, 0 );

fdServersComp.top = new FormAttachment( 0, 0 );

fdServersComp.right = new FormAttachment( 100, 0 );

fdServersComp.bottom = new FormAttachment( 100, 0 );

wSettingsComp.setLayoutData( fdServersComp );

wSettingsComp.layout();

wSettingsTab.setControl( wSettingsComp );

}

private void addFieldsTab() {

wFieldsTab = new CTabItem( wTabFolder, SWT.NONE );

wFieldsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FieldsTab.TabTitle" ) );

FormLayout fieldsLayout = new FormLayout();

fieldsLayout.marginWidth = Const.FORM_MARGIN;

fieldsLayout.marginHeight = Const.FORM_MARGIN;

Composite wFieldsComp = new Composite( wTabFolder, SWT.NONE );

wFieldsComp.setLayout( fieldsLayout );

props.setLook( wFieldsComp );

wGet = new Button( wFieldsComp, SWT.PUSH );

wGet.setText( BaseMessages.getString( PKG, "System.Button.GetFields" ) );

wGet.setToolTipText( BaseMessages.getString( PKG, "System.Tooltip.GetFields" ) );

lsGet = new Listener() {

public void handleEvent( Event e ) {

getPreviousFields( wFields );

}

};

wGet.addListener( SWT.Selection, lsGet );

setButtonPositions( new Button[]{wGet}, Const.MARGIN, null );

final int fieldsRowCount = model.getFields().size();

String[] names = this.fieldNames != null ? this.fieldNames : new String[]{""};

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.NameColumn.Column" ),

ColumnInfo.COLUMN_TYPE_CCOMBO, names, false );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TargetNameColumn.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

wFields =

new TableView( transMeta, wFieldsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta,

fieldsRowCount,

lsMod, props );

FormData fdFields = new FormData();

fdFields.left = new FormAttachment( 0, Const.MARGIN );

fdFields.top = new FormAttachment( 0, Const.MARGIN );

fdFields.right = new FormAttachment( 100, -Const.MARGIN );

fdFields.bottom = new FormAttachment( wGet, -Const.MARGIN );

wFields.setLayoutData( fdFields );

FormData fdFieldsComp = new FormData();

fdFieldsComp.left = new FormAttachment( 0, 0 );

fdFieldsComp.top = new FormAttachment( 0, 0 );

fdFieldsComp.right = new FormAttachment( 100, 0 );

fdFieldsComp.bottom = new FormAttachment( 100, 0 );

wFieldsComp.setLayoutData( fdFieldsComp );

wFieldsComp.layout();

wFieldsTab.setControl( wFieldsComp );

}

private void fillOptionsGroup( Composite parentTab ) {

int margin = Const.MARGIN;

wSettingsGroup = new Group( parentTab, SWT.SHADOW_NONE );

props.setLook( wSettingsGroup );

wSettingsGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsGroup.Label" ) );

FormLayout settingGroupLayout = new FormLayout();

settingGroupLayout.marginWidth = 10;

settingGroupLayout.marginHeight = 10;

wSettingsGroup.setLayout( settingGroupLayout );

// Timeout

wTimeOut =

new LabelTimeComposite( wSettingsGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut"

+ ".Label" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut.Tooltip" ) );

props.setLook( wTimeOut );

wTimeOut.addModifyListener( lsMod );

// BatchSize

wlBatchSize = new Label( wSettingsGroup, SWT.RIGHT );

wlBatchSize.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.BatchSize.Label" ) );

props.setLook( wlBatchSize );

wBatchSize = new TextVar( transMeta, wSettingsGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

props.setLook( wBatchSize );

wBatchSize.addModifyListener( lsMod );

// Stop on error

wlStopOnError = new Label( wSettingsGroup, SWT.RIGHT );

wlStopOnError.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Label" ) );

wStopOnError = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wStopOnError.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Tooltip" ) );

wStopOnError.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

model.setChanged();

}

} );

// ID input

wIdInField =

new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdField.Tooltip" ) );

props.setLook( wIdInField );

wIdInField.getComboWidget().setEditable( true );

wIdInField.addModifyListener( lsMod );

wIdInField.addFocusListener( new FocusListener() {

public void focusLost( org.eclipse.swt.events.FocusEvent e ) {

}

public void focusGained( org.eclipse.swt.events.FocusEvent e ) {

getPreviousFields( wIdInField );

}

} );

getPreviousFields( wIdInField );

wlIsOverwrite = new Label( wSettingsGroup, SWT.RIGHT );

wlIsOverwrite.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Label" ) );

wIsOverwrite = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wIsOverwrite.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Tooltip" ) );

wIsOverwrite.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

model.setChanged();

}

} );

// Output rows

wlUseOutput = new Label( wSettingsGroup, SWT.RIGHT );

wlUseOutput.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Label" ) );

wUseOutput = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wUseOutput.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Tooltip" ) );

wUseOutput.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

wIdOutField.setEnabled( wUseOutput.getSelection() );

model.setChanged();

}

} );

// ID out field

wIdOutField =

new LabelTextVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdOutField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdOutField.Tooltip" ) );

props.setLook( wIdOutField );

wIdOutField.setEnabled( wUseOutput.getSelection() );

wIdOutField.addModifyListener( lsMod );

// use json

wlIsJson = new Label( wSettingsGroup, SWT.RIGHT );

wlIsJson.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Label" ) );

wIsJson = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wIsJson.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Tooltip" ) );

wIsJson.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

wJsonField.setEnabled( wIsJson.getSelection() );

wFields.setEnabled( !wIsJson.getSelection() );

wFields.setVisible( !wIsJson.getSelection() );

wGet.setEnabled( !wIsJson.getSelection() );

model.setChanged();

}

} );

// Json field

wJsonField =

new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.JsonField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.JsonField.Tooltip" ) );

wJsonField.getComboWidget().setEditable( true );

props.setLook( wJsonField );

wJsonField.addModifyListener( lsMod );

wJsonField.addFocusListener( new FocusListener() {

public void focusLost( org.eclipse.swt.events.FocusEvent e ) {

}

public void focusGained( org.eclipse.swt.events.FocusEvent e ) {

getPreviousFields( wJsonField );

}

} );

getPreviousFields( wJsonField );

wJsonField.setEnabled( wIsJson.getSelection() );

Control[] settingsControls = new Control[]{wlBatchSize, wBatchSize, wlStopOnError, wStopOnError, wTimeOut,

wIdInField, wlIsOverwrite, wIsOverwrite, wlUseOutput, wUseOutput, wIdOutField, wlIsJson, wIsJson,

wJsonField};

placeControls( wSettingsGroup, settingsControls );

fdSettingsGroup = new FormData();

fdSettingsGroup.left = new FormAttachment( 0, margin );

fdSettingsGroup.top = new FormAttachment( wIndexGroup, margin );

fdSettingsGroup.right = new FormAttachment( 100, -margin );

wSettingsGroup.setLayoutData( fdSettingsGroup );

}

private void getPreviousFields( LabelComboVar combo ) {

String value = combo.getText();

combo.removeAll();

combo.setItems( getInputFieldNames() );

if ( value != null ) {

combo.setText( value );

}

}

private String[] getInputFieldNames() {

if ( this.fieldNames == null ) {

try {

RowMetaInterface r = transMeta.getPrevStepFields( stepname );

if ( r != null ) {

fieldNames = r.getFieldNames();

}

} catch ( KettleException ke ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogTitle" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogMessage" ), ke );

return new String[0];

}

}

return fieldNames;

}

private void getPreviousFields( TableView table ) {

try {

RowMetaInterface r = transMeta.getPrevStepFields( stepname );

if ( r != null ) {

BaseStepDialog.getFieldsFromPrevious( r, table, 1, new int[]{1, 2}, null, 0, 0, null );

}

} catch ( KettleException ke ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "System.Dialog.GetFieldsFailed.Title" ), BaseMessages

.getString( PKG, "System.Dialog.GetFieldsFailed.Message" ), ke );

}

}

private void placeControls( Group group, Control[] controls ) {

Control previousAbove = group;

Control previousLeft = group;

for ( Control control : controls ) {

if ( control instanceof Label ) {

addLabelAfter( control, previousAbove );

previousLeft = control;

} else {

addWidgetAfter( control, previousAbove, previousLeft );

previousAbove = control;

previousLeft = group;

}

}

}

private void addWidgetAfter( Control widget, Control widgetAbove, Control widgetLeft ) {

props.setLook( widget );

FormData fData = new FormData();

fData.left = new FormAttachment( widgetLeft, Const.MARGIN );

fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

fData.right = new FormAttachment( 100, -Const.MARGIN );

widget.setLayoutData( fData );

}

private void addLabelAfter( Control widget, Control widgetAbove ) {

props.setLook( widget );

FormData fData = new FormData();

fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

fData.right = new FormAttachment( Const.MIDDLE_PCT, -Const.MARGIN );

widget.setLayoutData( fData );

}

/**

* Read the data from the ElasticSearchBulkMeta object and show it in this dialog.

*

* @param in The ElasticSearchBulkMeta object to obtain the data from.

*/

public void getData( ElasticSearchBulkMeta in ) {

wIndex.setText( Const.NVL( in.getIndex(), "" ) );

wType.setText( Const.NVL( in.getType(), "" ) );

wBatchSize.setText( Const.NVL( in.getBatchSize(), "" + ElasticSearchBulkMeta.DEFAULT_BATCH_SIZE ) );

wStopOnError.setSelection( in.isStopOnError() );

wTimeOut.setText( Const.NVL( in.getTimeOut(), "" ) );

wTimeOut.setTimeUnit( in.getTimeoutUnit() );

wIdInField.setText( Const.NVL( in.getIdInField(), "" ) );

wIsOverwrite.setSelection( in.isOverWriteIfSameId() );

wIsJson.setSelection( in.isJsonInsert() );

wJsonField.setText( Const.NVL( in.getJsonField(), "" ) );

wJsonField.setEnabled( wIsJson.getSelection() ); // listener not working here

wUseOutput.setSelection( in.isUseOutput() );

wIdOutField.setText( Const.NVL( in.getIdOutField(), "" ) );

wIdOutField.setEnabled( wUseOutput.getSelection() ); // listener not working here

// Fields

mapToTableView( model.getFieldsMap(), wFields );

// Servers

for ( ElasticSearchBulkMeta.Server server : model.getServers() ) {

wServers.add( server.address, "" + server.port );

}

wServers.removeEmptyRows();

wServers.setRowNums();

// Settings

mapToTableView( model.getSettingsMap(), wSettings );

wStepname.selectAll();

wStepname.setFocus();

}

private void mapToTableView( Mapmap, TableView table ) {

for ( String key : map.keySet() ) {

table.add( key, map.get( key ) );

}

table.removeEmptyRows();

table.setRowNums();

}

private void cancel() {

stepname = null;

model.setChanged( changed );

dispose();

}

private void ok() {

try {

toModel( model );

} catch ( KettleException e ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogTitle" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogMessage" ), e );

}

dispose();

}

private void toModel( ElasticSearchBulkMeta in ) throws KettleException { // copy info to ElasticSearchBulkMeta

stepname = wStepname.getText();

in.setType( wType.getText() );

in.setIndex( wIndex.getText() );

in.setBatchSize( wBatchSize.getText() );

in.setTimeOut( Const.NVL( wTimeOut.getText(), null ) );

in.setTimeoutUnit( wTimeOut.getTimeUnit() );

in.setIdInField( wIdInField.getText() );

in.setOverWriteIfSameId( StringUtils.isNotBlank( wIdInField.getText() ) && wIsOverwrite.getSelection() );

in.setStopOnError( wStopOnError.getSelection() );

in.setJsonInsert( wIsJson.getSelection() );

in.setJsonField( wIsJson.getSelection() ? wJsonField.getText() : null );

in.setIdOutField( wIdOutField.getText() );

in.setUseOutput( wUseOutput.getSelection() );

in.clearFields();

if ( !wIsJson.getSelection() ) {

for ( int i = 0; i < wFields.getItemCount(); i++ ) {

String[] row = wFields.getItem( i );

if ( StringUtils.isNotBlank( row[0] ) ) {

in.addField( row[0], row[1] );

}

}

}

in.clearServers();

for ( int i = 0; i < wServers.getItemCount(); i++ ) {

String[] row = wServers.getItem( i );

if ( StringUtils.isNotBlank( row[0] ) ) {

try {

in.addServer( row[0], Integer.parseInt( row[1] ) );

} catch ( NumberFormatException nfe ) {

in.addServer( row[0], ElasticSearchBulkMeta.DEFAULT_PORT );

}

}

}

in.clearSettings();

for ( int i = 0; i < wSettings.getItemCount(); i++ ) {

String[] row = wSettings.getItem( i );

in.addSetting( row[0], row[1] );

}

}

private enum TestType {

INDEX, CLUSTER,

}

private void test( TestType testType ) {

try {

ElasticSearchBulkMeta tempMeta = new ElasticSearchBulkMeta();

toModel( tempMeta );

if ( !tempMeta.getServers().isEmpty() ) {

Settings.Builder settingsBuilder = Settings.builder();

settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

tempMeta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(), transMeta

.environmentSubstitute( s.getValue() ) ) );

RestHighLevelClient rclient=null;

//??? ? ??try ( PreBuiltTransportClient client = new PreBuiltTransportClient( settingsBuilder.build() ) ) {

//

//??????? for ( Server server : tempMeta.getServers() ) {

//

//????????? client.addTransportAddress( new TransportAddress(

//????????????????? InetAddress.getByName( transMeta.environmentSubstitute( server.getAddress() ) ),

//????????????????? server.getPort() ) );

//

//??????? }

for ( Server server : tempMeta.getServers() ) {

rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

}

//??????? AdminClient admin = rclient.admin();

String[] index = tempMeta.getIndex().split(",");

GetIndexRequest request = new GetIndexRequest();

request.indices(index);

request.local(false);

request.humanReadable(true);

boolean exists? = rclient.indices().exists(request, RequestOptions.DEFAULT);

switch ( testType ) {

case INDEX:

if ( StringUtils.isBlank( tempMeta.getIndex() ) ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoIndex" ) );

break;

}

// First check to see if the index exists

//??????????? IndicesExistsRequestBuilder indicesExistBld = admin.indices().prepareExists( tempMeta.getIndex() );

//??????????? IndicesExistsResponse indicesExistResponse = indicesExistBld.execute().get();

if ( !exists ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoIndex" ) );

return;

}

//??????????? RecoveryRequestBuilder indicesBld = rclient.indices().prepareRecoveries( tempMeta.getIndex() );

//??????????? ActionFuturelafInd = indicesBld.execute();

//??????????? String shards = "" + lafInd.get().getSuccessfulShards() + "/" + lafInd.get().getTotalShards();

showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.TestOK", "true" ) );

break;

case CLUSTER:

//??????????? ClusterStateRequestBuilder clusterBld = admin.cluster().prepareState();

//??????????? ActionFuturelafClu = clusterBld.execute();

//??????????? ClusterStateResponse cluResp = lafClu.actionGet();

//??????????? String name = cluResp.getClusterName().value();

//??????????? ClusterState cluState = cluResp.getState();

//??????????? int numNodes = cluState.getNodes().getSize();

//???????? ???showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", name, numNodes ) );

MainResponse response = rclient.info(RequestOptions.DEFAULT);

showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", response.getClusterName(), response.getVersion() ) );

break;

default:

break;

}

}else{

showError( "Servers is null" );

}

} catch ( NoNodeAvailableException | MasterNotDiscoveredException e ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

} catch ( Exception e ) {

showError( e.getLocalizedMessage() );

}

}

private void showError( String message ) {

MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );

mb.setMessage( message );

mb.setText( BaseMessages.getString( PKG, "System.Dialog.Error.Title" ) );

mb.open();

}

private void showMessage( String message ) {

MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_INFORMATION );

mb.setMessage( message );

mb.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Test.TestOKTitle" ) );

mb.open();

}

@Override

public String toString() {

return this.getClass().getName();

}

}

7、???? 進入pentaho-kettle-8.2.0.0-R目錄,打開cmd命令窗口,執行mvn clean package -Dmaven.test.skip=true進行打包編譯,過程中會出現錯誤,進入pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\assemblies\plugin\target目錄查看是否存在名稱為elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip的包,如果存在則不用在乎錯誤。如果不存在,請根據提示進行修改,再執行命令直到出現elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip包為止。

8、進入工具安裝目錄data-integration\pdi-ce-8.2.0.0-342\data-integration\plugins,刪除elasticsearch-bulk-insert-plugin文件夾

9、將elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip包解壓,將elasticsearch-bulk-insert-plugin文件夾移動到工具安裝目錄data-integration\pdi-ce-8.2.0.0-342\data-integration\plugins下即可。

10、進入工具安裝目錄data-integration\pdi-ce-8.2.0.0-342\data-integration,執行Spoon.bat

11、到此,kettle連接Elastic插件可以使用了。

三、注意事項

1、一個索引只能有一個type

2、Settings不用配置,如下圖

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的kettle连接不上es7_kettle8.2连接ElasticSearch7的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。