日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java es 数据批量导入_ElasticSearch—Java批量导入导出

發(fā)布時間:2023/12/13 java 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java es 数据批量导入_ElasticSearch—Java批量导入导出 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

網(wǎng)上找了很多,我的es是2.3.5版本,網(wǎng)上的客戶端最少都是5.x版本,所以沒有能用的。自己整合了一下 2.3.5版本的。

pom文件:

org.elasticsearch

elasticsearch

2.3.5

com.alibaba

fastjson

1.1.35

org.apache.commons

commons-io

1.3.2

org.apache.commons

commons-lang3

3.1

代碼:

package com.topcom.tjs.create.elaticsearch;

import org.apache.commons.lang3.StringUtils;

import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.action.search.SearchRequestBuilder;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.bootstrap.Elasticsearch;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.search.SearchHit;

import java.io.*;

import java.net.InetAddress;

import java.net.InetSocketAddress;

/**

* @author rstyro

*/

public class ElasticSearch {

public static void main(String[] args) throws Exception {

String clustName = "tc-es";

String indexName = "yuqing_0006_6";

String clusttIp = "192.168.1.14";

int clustPort = 20069;

String filePath = "I:\\data\\data\\esdata.json";

String typeName = "article";

//outToFile(clustName, indexName, typeName, clusttIp, clustPort, filePath);

fileToEs(clustName, indexName, typeName, clusttIp, clustPort, filePath);

}

/**

* elasticsearch 數(shù)據(jù)到文件

*

* @param clustName 集群名稱

* @param indexName 索引名稱

* @param typeName type名稱

* @param sourceIp ip

* @param sourcePort transport 服務(wù)端口

* @param filePath 生成的文件路徑

*/

public static void outToFile(String clustName, String indexName, String typeName, String sourceIp, int sourcePort, String filePath) throws Exception {

TransportClient client = createClient(clustName, sourceIp, sourcePort);

SearchRequestBuilder builder = client.prepareSearch(indexName);

if (typeName != null) {

builder.setTypes(typeName);

}

builder.setQuery(QueryBuilders.matchAllQuery()).setQuery(QueryBuilders.matchQuery("type","NEWS"));

builder.setSize(10000);

builder.setScroll(new TimeValue(6000));

SearchResponse scrollResp = builder.execute().actionGet();

try {

//把導(dǎo)出的結(jié)果以JSON的格式寫到文件里

BufferedWriter out = new BufferedWriter(new FileWriter(filePath, true));

long count = 0;

while (true) { //循環(huán)插入,直到所有結(jié)束

for (SearchHit hit : scrollResp.getHits().getHits()) {

String json = hit.getSourceAsString();

if (StringUtils.isNotEmpty(json) && !"".equals(json)) {

out.write(json);

out.write("\r\n");

count++;

System.out.println("*******************"+count);

}

}

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())

.setScroll(new TimeValue(6000)).execute().actionGet();

if (scrollResp.getHits().getHits().length == 0) {

break;

}

}

System.out.println("總共寫入數(shù)據(jù):" + count);

out.close();

client.close();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* 把json 格式的文件導(dǎo)入到elasticsearch 服務(wù)器

*

* @param clustName 集群名稱

* @param indexName 索引名稱

* @param typeName type 名稱

* @param sourceIp ip

* @param sourcePort 端口

* @param filePath json格式的文件路徑

*/

@SuppressWarnings("deprecation")

public static void fileToEs(String clustName, String indexName, String typeName, String sourceIp, int sourcePort, String filePath) throws Exception {

TransportClient client = createClient(clustName, sourceIp, sourcePort);

try {

//把導(dǎo)出的結(jié)果以JSON的格式寫到文件里

BufferedReader br = new BufferedReader(new FileReader(filePath));

String json = null;

int count = 0;

//開啟批量插入

BulkRequestBuilder bulkRequest = client.prepareBulk();

while ((json = br.readLine()) != null) {

bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));

//每一千條提交一次

count++;

if (count % 1000 == 0) {

System.out.println("本次提交了1000條");

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {

System.out.println("message:" + bulkResponse.buildFailureMessage());

}

//重新創(chuàng)建一個bulk

bulkRequest = client.prepareBulk();

}

}

bulkRequest.execute().actionGet();

System.out.println("總提交了:" + count);

br.close();

client.close();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* 建立連接

* @param cluster

* @param ip

* @param port

* @return

* @throws Exception

*/

private static TransportClient createClient(String cluster, String ip, Integer port) throws Exception {

TransportClient client = null;

if (client == null) {

synchronized (Elasticsearch.class) {

Settings settings = Settings.settingsBuilder().put("cluster.name", cluster).build();

client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),

port));

}

}

return client;

}

}

總結(jié)

以上是生活随笔為你收集整理的java es 数据批量导入_ElasticSearch—Java批量导入导出的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。