日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Elasticsearch-Jest 配置ES集群源码解读

發(fā)布時間:2025/3/21 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Elasticsearch-Jest 配置ES集群源码解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • Jest Github地址
  • 搭建源碼環(huán)境
  • Jest配置ES集群
  • Jest 配置ES集群,確保應用高可用的原理探究
    • 初始化 JestClient
    • NodeChecker 源碼分析
    • 發(fā)起請求的過程
  • 遇到的問題


Jest Github地址

直接訪問 https://github.com/searchbox-io/Jest ,把源碼拉下來


搭建源碼環(huán)境

我拉了個5.3.4的版本,最新版本為6.3.1 ,大同小異

test 這個module是我自己寫的測試集群代碼,GitHub上是沒有這個的 .


Jest配置ES集群

單例Client ,有個屬性JestClient ,需要初始化。

package com.artisan.test;import com.google.gson.GsonBuilder; import io.searchbox.client.JestClient; import io.searchbox.client.JestClientFactory; import io.searchbox.client.config.HttpClientConfig;import java.util.Arrays; import java.util.concurrent.TimeUnit;public class Client {// volatile修飾,確保內(nèi)存可見private volatile static Client client = null;private static JestClient jestClient;/*** 私有構(gòu)造函數(shù)*/private Client() {initJestClient(); // 初始化JestClient}/*** 懶漢模式* double Check* @return*/public static Client getInstance() {if (client == null) {synchronized (Client.class) {if (client == null) {client = new Client();}}}return client;}/*** 獲取JestClient* @return*/public static JestClient getJestClient() {return jestClient;}private void initJestClient() {// 初始化的集群節(jié)點String[] serverUris = new String[]{"http://127.0.0.1:9200", "http://127.0.0.1:8200"};JestClientFactory factory = new JestClientFactory();// 設(shè)置HttpClientConfigfactory.setHttpClientConfig(new HttpClientConfig.Builder(Arrays.asList(serverUris)).discoveryEnabled(true) // 節(jié)點發(fā)現(xiàn),確保訪問的節(jié)點都是存活的節(jié)點,達到高可用.discoveryFrequency(2000, TimeUnit.MILLISECONDS) // NodeChecker的執(zhí)行頻率,默認10S.gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());// 返回jestClientjestClient = factory.getObject();} }

測試類

package com.artisan.test;import io.searchbox.client.JestResult; import io.searchbox.core.Get;import java.io.IOException;public class JestClientTest {/*** 構(gòu)造函數(shù)*/public JestClientTest() {Client.getInstance();// 初始化Client}private static void getDocumentMyStroe(String id) {Get get = new Get.Builder("my_store", id).type("product").build();JestResult result ;try {result = Client.getJestClient().execute(get);if (result != null) System.out.println(id + ":" + result.getJsonObject());} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {Thread.sleep(5000);// 先讓NodeChecker運行,獲取存活的節(jié)點,主線程這里先休眠5秒for (int i = 0; i < Integer.MAX_VALUE; i++) {Thread.sleep(2000);getDocumentMyStroe("998");}} }

Jest 配置ES集群,確保應用高可用的原理探究

來看看關(guān)鍵點.discoveryEnabled(true) 都干了啥?

初始化 JestClient

到 JestClientFactory#getObject() 方法 中看下 ,大致說下整個方法的邏輯:

public JestClient getObject() {// 初始化 JestHttpClientJestHttpClient client = new JestHttpClient();if (httpClientConfig == null) {log.debug("There is no configuration to create http client. Going to create simple client with default values");httpClientConfig = new HttpClientConfig.Builder("http://localhost:9200").build();}client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled());// 初始化的es集群節(jié)點client.setServers(httpClientConfig.getServerList());// 設(shè)置HttpClient、AsyncClient final HttpClientConnectionManager connectionManager = getConnectionManager();final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager();client.setHttpClient(createHttpClient(connectionManager));client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager));// 設(shè)置自定義的GsonGson gson = httpClientConfig.getGson();if (gson == null) {log.info("Using default GSON instance");} else {log.info("Using custom GSON instance");client.setGson(gson);}// 創(chuàng)建NodeChecker并啟動Node Discovery// set discovery (should be set after setting the httpClient on jestClient)if (httpClientConfig.isDiscoveryEnabled()) {log.info("Node Discovery enabled...");if (!Strings.isNullOrEmpty(httpClientConfig.getDiscoveryFilter())) {log.info("Node Discovery filtering nodes on \"{}\"", httpClientConfig.getDiscoveryFilter());}NodeChecker nodeChecker = createNodeChecker(client, httpClientConfig);client.setNodeChecker(nodeChecker);nodeChecker.startAsync();nodeChecker.awaitRunning();} else {log.info("Node Discovery disabled...");}// 如果maxConnectionIdleTime大于0則會創(chuàng)建IdleConnectionReaper,進行Idle connection reaping (空閑線程回收)// schedule idle connection reaping if configuredif (httpClientConfig.getMaxConnectionIdleTime() > 0) {log.info("Idle connection reaping enabled...");IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager));client.setIdleConnectionReaper(reaper);reaper.startAsync();reaper.awaitRunning();} else {log.info("Idle connection reaping disabled...");}Set<HttpHost> preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts();if (!preemptiveAuthTargetHosts.isEmpty()) {log.info("Authentication cache set for preemptive authentication");client.setHttpClientContextTemplate(createPreemptiveAuthContext(preemptiveAuthTargetHosts));}return client;}

重點看下 discoveryEnable 設(shè)置為true的情況下,Jest的處理邏輯


NodeChecker 源碼分析

NodeChecker繼承了com.google.common.util.concurrent.AbstractScheduledService,

它的構(gòu)造器根據(jù)clientConfig的discoveryFrequency及discoveryFrequencyTimeUnit了fixedDelayScheduler來執(zhí)行node checker;

public NodeChecker(JestClient jestClient, ClientConfig clientConfig) {// 構(gòu)建action ,可以根據(jù)前面HttpClientConfig#discoveryFilter(String discoveryFilter) 添加Nodeaction = new NodesInfo.Builder().withHttp().addNode(clientConfig.getDiscoveryFilter()).build();this.client = jestClient;this.defaultScheme = clientConfig.getDefaultSchemeForDiscoveredNodes();// 根據(jù)discoveryFrequency(2000, TimeUnit.MILLISECONDS) 實例化一個定時任務出來 使用的Google Guava的包 this.scheduler = Scheduler.newFixedDelaySchedule(0l,clientConfig.getDiscoveryFrequency(),clientConfig.getDiscoveryFrequencyTimeUnit());// 初始化的根節(jié)點 this.bootstrapServerList = ImmutableSet.copyOf(clientConfig.getServerList());// 實例化 discoveredServerList 為空,后續(xù)使用 this.discoveredServerList = new LinkedHashSet<String>();}

實現(xiàn)了runOneIteration方法,該方法主要是發(fā)送NodesInfo請求 GET /_nodes/_all/http

@Overrideprotected void runOneIteration() throws Exception {JestResult result;try {result = client.execute(action);} catch (CouldNotConnectException cnce) {// Can't connect to this node, remove it from the listlog.error("Connect exception executing NodesInfo!", cnce);removeNodeAndUpdateServers(cnce.getHost());return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);} catch (Exception e) {log.error("Error executing NodesInfo!", e);client.setServers(bootstrapServerList);return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);} if (result.isSucceeded()) {LinkedHashSet<String> httpHosts = new LinkedHashSet<String>();JsonObject jsonMap = result.getJsonObject();JsonObject nodes = (JsonObject) jsonMap.get("nodes");if (nodes != null) {for (Entry<String, JsonElement> entry : nodes.entrySet()) {JsonObject host = entry.getValue().getAsJsonObject();JsonElement addressElement = null;if (host.has("version")) {int majorVersion = Integer.parseInt(Splitter.on('.').splitToList(host.get("version").getAsString()).get(0));if (majorVersion >= 5) {JsonObject http = host.getAsJsonObject("http");if (http != null && http.has(PUBLISH_ADDRESS_KEY_V5))addressElement = http.get(PUBLISH_ADDRESS_KEY_V5);}}if (addressElement == null) {// get as a JsonElement first as some nodes in the cluster may not have an http_addressif (host.has(PUBLISH_ADDRESS_KEY)) addressElement = host.get(PUBLISH_ADDRESS_KEY);}if (addressElement != null && !addressElement.isJsonNull()) {String httpAddress = getHttpAddress(addressElement.getAsString());if(httpAddress != null) httpHosts.add(httpAddress);}}}if (log.isDebugEnabled()) {log.debug("Discovered {} HTTP hosts: {}", httpHosts.size(), Joiner.on(',').join(httpHosts));}discoveredServerList = httpHosts;client.setServers(discoveredServerList);} else {log.warn("NodesInfo request resulted in error: {}", result.getErrorMessage());client.setServers(bootstrapServerList);}}
  • 請求成功的話 解析body,如果nodes下面有version,取第一位,判斷大于等于5的話則取http節(jié)點下面的PUBLISH_ADDRESS_KEY_V5[publish_address]屬性值,封裝成http后添加到discoveredServerList ,供請求獲取URL使用。(里面都是存活的節(jié)點),如果沒有取到,則取PUBLISH_ADDRESS_KEY[http_address]屬性值,封裝成http后添加到discoveredServerList。
  • 請求拋出CouldNotConnectException則調(diào)用removeNodeAndUpdateServers方法移除該host;如果拋出其他的Exception則將client的servers重置為bootstrapServerList


發(fā)起請求的過程

執(zhí)行的execute方法。Client.getJestClient 返回的是 JestClient接口

看下 JestHttpClient#execute

/*** @throws IOException in case of a problem or the connection was aborted during request,* or in case of a problem while reading the response stream* @throws CouldNotConnectException if an {@link HttpHostConnectException} is encountered*/@Overridepublic <T extends JestResult> T execute(Action<T> clientRequest) throws IOException {return execute(clientRequest, null);}

繼續(xù)

public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {// 獲取 HttpUriRequest HttpUriRequest request = prepareRequest(clientRequest, requestConfig);CloseableHttpResponse response = null;try {response = executeRequest(request);return deserializeResponse(response, request, clientRequest);} catch (HttpHostConnectException ex) {throw new CouldNotConnectException(ex.getHost().toURI(), ex);} finally {if (response != null) {try {response.close();} catch (IOException ex) {log.error("Exception occurred while closing response stream.", ex);}}}}

重點來了

HttpUriRequest request = prepareRequest(clientRequest, requestConfig);

繼續(xù)跟到prepareRequest

protected <T extends JestResult> HttpUriRequest prepareRequest(final Action<T> clientRequest, final RequestConfig requestConfig) {String elasticSearchRestUrl = getRequestURL(getNextServer(), clientRequest.getURI());HttpUriRequest request = constructHttpMethod(clientRequest.getRestMethodName(), elasticSearchRestUrl, clientRequest.getData(gson), requestConfig);log.debug("Request method={} url={}", clientRequest.getRestMethodName(), elasticSearchRestUrl);// add headers added to actionfor (Entry<String, Object> header : clientRequest.getHeaders().entrySet()) {request.addHeader(header.getKey(), header.getValue().toString());}return request;}

重點: getNextServer()

/*** @throws io.searchbox.client.config.exception.NoServerConfiguredException*/protected String getNextServer() {return serverPoolReference.get().getNextServer();}

繼續(xù)

總結(jié)一下:

  • JestHttpClient繼承了AbstractJestClient,它的execute及executeAsync方法都調(diào)用了prepareRequest來構(gòu)造HttpUriRequest;
  • prepareRequest方法會先調(diào)用getNextServer方法來獲取要請求的elasticSearchServer的地址;
  • 而getNextServer方法則是調(diào)用的serverPoolReference.get().getNextServer()
  • 看看 serverPoolReference 是個啥?
private final AtomicReference<ServerPool> serverPoolReference =new AtomicReference<ServerPool>(new ServerPool(ImmutableSet.<String>of()));
  • 再看看剛才NodeChecker 處理完成后調(diào)用的 client.setServers(discoveredServerList);

到 AbstractJestClient 類中看下 setServers方法

AbstractJestClient有一個serverPoolReference屬性,AtomicReference,其泛型為ServerPool;setServers方法則是創(chuàng)建新的ServerPool,然后更新serverPoolReference

ServerPool有個AtomicInteger類型的nextServerIndex,getNextServer方法則是通過nextServerIndex.getAndIncrement() % serversRing.size()來確定取的serversRing這個List的index,其實現(xiàn)的是Round Robin策略;極端情況下出現(xiàn)IndexOutOfBoundsException的話,則會重置nextServerIndex為0,然后繼續(xù)按Round Robin策略取下一個server

是不是就對上了? NodeChecker負責更新,execute則從里面取,所里取出來的都是 存活的節(jié)點。 這樣就做到了動態(tài)的發(fā)現(xiàn)。

節(jié)點上線后,自動發(fā)送到該節(jié)點,節(jié)點掛掉后,能自動移除。 全稱無需干預。

再說一點, NodeChecker有個執(zhí)行頻率, 確保這個執(zhí)行完了以后,再請求ES。 舉個例子,比如3個節(jié)點,你啟動應用的時候,正好有一個節(jié)點是掛掉的,而且正常的業(yè)務請求正好請求到了這個壞的節(jié)點上,是不是就掛了。 如果NodeChecker執(zhí)行完以后,那取出的節(jié)點肯定是都是存活的。


遇到的問題

說下背景, 老項目 升級 , 以前是 單個ES節(jié)點,所以 沒有配置 集群,且Jest版本為Jdk1.7

初始化JestClient如下

JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new HttpClientConfi.Builder("http://127.0.0.1:9200").gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());jestClient = factory.getObject();

配置連接集群的地址,最重要的一行代碼,增加 .discoveryEnabled(true)

用的是2.4.0的版本, 升級到了5.3.4以后,去debug jest的源碼的時候,打上的斷點,總和是源碼對不起來 … 結(jié)果是 IDEA 發(fā)布的Tomcat工程路徑中 老的2.4.0的jar包還在原來的目錄下面,導致Tomcat加載了2.4.0 jar包中的類,刪除老的jar包,重新編譯測試,通過。

做了幾件事兒

  • 升級JDK到1.8
  • Jest 升級到 5.3.4
  • 依賴的Guava升級到了19.0
  • 感興趣的同學,用我上面提供的測試代碼測試即可。


    總結(jié)

    以上是生活随笔為你收集整理的Elasticsearch-Jest 配置ES集群源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。