日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

elasticsearch-1.3.0 之索引代码粗略梳理

發布時間:2023/12/19 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticsearch-1.3.0 之索引代码粗略梳理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

elasticsearch-1.3.0

發送請求
創建

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer?pretty {"acknowledged" : true }

索引

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}' {"_index" : "customer","_type" : "external","_id" : "1","_version" : 1,"created" : true } [root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}' {"_index" : "customer","_type" : "external","_id" : "1","_version" : 2,"created" : false }

這里先跟蹤下索引的流程,netty的bootstrap暫且不管,從HttpRequestHandler的messageReceived說起

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {HttpRequest request = (HttpRequest) e.getMessage();// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally// when reading, or using a cumalation bufferNettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest));super.messageReceived(ctx, e); }

這里的dispatchRequest啟示就是NettyHttpServerTransport
NettyHttpServerTransport

void dispatchRequest(HttpRequest request, HttpChannel channel) {httpServerAdapter.dispatchRequest(request, channel); }

Dispatcher,static class Dispatcher implements HttpServerAdapter

public void dispatchRequest(HttpRequest request, HttpChannel channel) {server.internalDispatchRequest(request, channel); }

HttpServer

public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {if (request.rawPath().startsWith("/_plugin/")) {RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);filterChain.continueProcessing(request, channel);return;}restController.dispatchRequest(request, channel); }

RestController的dispatchRequest()主要是executeHandler()

try {executeHandler(request, channel); } catch (Throwable e) {

executeHandler方法中不同的handler處理請求,這里的handler是RestIndexAction,繼承自

final RestHandler handler = getHandler(request); if (handler != null) {handler.handleRequest(request, channel); }

在BaseRestHandler中

public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {handleRequest(request, channel, usefulHeaders.length == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders)); } protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

實現父類在BaseRestHandler的handleRequest方法,最后調用NodeClient的index方法
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
NodeClient的父類AbstractClient中index的實現

public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {execute(IndexAction.INSTANCE, request, listener); }

NodeClient中的execute方法實現

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);transportAction.execute(request, listener);//TransportIndexAction extends TransportShardReplicationOperationAction }

這里的transportAction是TransportIndexAction,
其中TransportShardReplicationOperationAction是TransportIndexAction的父類,中TransportShardReplicationOperationAction繼承自TransportAction,TransportAction中execute的實現

public void execute(Request request, ActionListener<Response> listener) {if (request.listenerThreaded()) {listener = new ThreadedActionListener<>(threadPool, listener, logger);}ActionRequestValidationException validationException = request.validate();if (validationException != null) {listener.onFailure(validationException);return;}try {doExecute(request, listener);} catch (Throwable e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);} }

直接調用TransportIndexAction的doExecute

protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index APIif (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {request.beforeLocalFork(); // we fork on another thread...createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {@Overridepublic void onResponse(CreateIndexResponse result) {innerExecute(request, listener);}@Overridepublic void onFailure(Throwable e) {if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {// we have the index, do ittry {innerExecute(request, listener);} catch (Throwable e1) {listener.onFailure(e1);}} else {listener.onFailure(e);}}});} else {innerExecute(request, listener);}}

這里走 innerExecute(request, listener);

private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {super.doExecute(request, listener); }

這里的super就是TransportShardReplicationOperationAction了,TransportShardReplicationOperationAction中doExecute的實現

protected void doExecute(Request request, ActionListener<Response> listener) {new AsyncShardOperationAction(request, listener).start(); }

主要兩個方法,一個是獲取shard,另一個是shardOperationOnPrimary;
其中shard后邊再說,shardOperationOnPrimary在TransportIndexAction實現

protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {final IndexRequest request = shardRequest.request;// validate, if routing is required, that we got routingIndexMetaData indexMetaData = clusterState.metaData().index(request.index());MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());if (mappingMd != null && mappingMd.routing().required()) {if (request.routing() == null) {throw new RoutingMissingException(request.index(), request.type(), request.id());}}IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id()).routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());long version;boolean created;Engine.IndexingOperation op;if (request.opType() == IndexRequest.OpType.INDEX) {Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());if (index.parsedDoc().mappingsModified()) {mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());}indexShard.index(index);version = index.version();op = index;created = index.created();} else {Engine.Create create = indexShard.prepareCreate(sourceToParse,request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());if (create.parsedDoc().mappingsModified()) {mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());}indexShard.create(create);version = create.version();op = create;created = true;}if (request.refresh()) {try {indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));} catch (Throwable e) {// ignore}}// update the version on the request, so it will be used for the replicasrequest.version(version);request.versionType(request.versionType().versionTypeForReplicationAndRecovery());assert request.versionType().validateVersionForWrites(request.version());IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created);return new PrimaryResponse<>(shardRequest.request, response, op); }

走request.opType() == IndexRequest.OpType.INDEX分支,主要是indexShard.prepareIndex,indexShard.index(index)這里IndexShard是InternalIndexShard,的index實現

public ParsedDocument index(Engine.Index index) throws ElasticsearchException {writeAllowed(index.origin());index = indexingService.preIndex(index);try {if (logger.isTraceEnabled()) {logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());}engine.index(index);index.endTime(System.nanoTime());} catch (RuntimeException ex) {indexingService.failedIndex(index);throw ex;}indexingService.postIndex(index);return index.parsedDoc(); }

indexingService對應ShardIndexingService, engine是InternalEngine,InternalEngine的index()

public void index(Index index) throws EngineException {final IndexWriter writer;try (InternalLock _ = readLock.acquire()) {writer = currentIndexWriter();try (Releasable r = throttle.acquireThrottle()) {innerIndex(index, writer);}dirty = true;possibleMergeNeeded = true;flushNeeded = true;} catch (OutOfMemoryError | IllegalStateException | IOException t) {maybeFailEngine(t, "index");throw new IndexFailedEngineException(shardId, index, t);}checkVersionMapRefresh(); }

最終在InternalEngine的innerIndex方法中調用lunece的IndexWriter的,依據是不是存在有版本,來通過 writer.addDocuments或者updateDocument方法添加或者更新索引
添加add索引

if (index.docs().size() > 1) {writer.addDocuments(index.docs(), index.analyzer()); } else {writer.addDocument(index.docs().get(0), index.analyzer()); }

更新update索引

if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); } else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer()); }

最后Translog

Translog.Location translogLocation = translog.add(new Translog.Index(index));

具體代碼

private void innerIndex(Index index, IndexWriter writer) throws IOException {synchronized (dirtyLock(index.uid())) {final long currentVersion;VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());if (versionValue == null) {currentVersion = loadCurrentVersionFromIndex(index.uid());} else {if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {currentVersion = Versions.NOT_FOUND; // deleted, and GC} else {currentVersion = versionValue.version();}}long updatedVersion;long expectedVersion = index.version();if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {if (index.origin() == Operation.Origin.RECOVERY) {return;} else {throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);}}updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);index.updateVersion(updatedVersion);if (currentVersion == Versions.NOT_FOUND) {// document does not exists, we can optimize for createindex.created(true);if (index.docs().size() > 1) {writer.addDocuments(index.docs(), index.analyzer());} else {writer.addDocument(index.docs().get(0), index.analyzer());}} else {if (versionValue != null) {index.created(versionValue.delete()); // we have a delete which is not GC'ed...}if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer());} else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());}}Translog.Location translogLocation = translog.add(new Translog.Index(index));versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));indexingService.postIndexUnderLock(index);} }

link
分布式搜索Elasticsearch源碼分析之二------索引過程源碼概要分析

轉載于:https://www.cnblogs.com/donganwangshi/p/4318045.html

總結

以上是生活随笔為你收集整理的elasticsearch-1.3.0 之索引代码粗略梳理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。