elasticsearch-1.3.0 之索引代码粗略梳理
elasticsearch-1.3.0
發送請求
創建
索引
[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}' {"_index" : "customer","_type" : "external","_id" : "1","_version" : 1,"created" : true } [root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}' {"_index" : "customer","_type" : "external","_id" : "1","_version" : 2,"created" : false }這里先跟蹤下索引的流程,netty的bootstrap暫且不管,從HttpRequestHandler的messageReceived說起
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {HttpRequest request = (HttpRequest) e.getMessage();// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally// when reading, or using a cumalation bufferNettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest));super.messageReceived(ctx, e); }這里的dispatchRequest啟示就是NettyHttpServerTransport
NettyHttpServerTransport
Dispatcher,static class Dispatcher implements HttpServerAdapter
public void dispatchRequest(HttpRequest request, HttpChannel channel) {server.internalDispatchRequest(request, channel); }HttpServer
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {if (request.rawPath().startsWith("/_plugin/")) {RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);filterChain.continueProcessing(request, channel);return;}restController.dispatchRequest(request, channel); }RestController的dispatchRequest()主要是executeHandler()
try {executeHandler(request, channel); } catch (Throwable e) {executeHandler方法中不同的handler處理請求,這里的handler是RestIndexAction,繼承自
final RestHandler handler = getHandler(request); if (handler != null) {handler.handleRequest(request, channel); }在BaseRestHandler中
public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {handleRequest(request, channel, usefulHeaders.length == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders)); } protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;實現父類在BaseRestHandler的handleRequest方法,最后調用NodeClient的index方法
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
NodeClient的父類AbstractClient中index的實現
NodeClient中的execute方法實現
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);transportAction.execute(request, listener);//TransportIndexAction extends TransportShardReplicationOperationAction }這里的transportAction是TransportIndexAction,
其中TransportShardReplicationOperationAction是TransportIndexAction的父類,中TransportShardReplicationOperationAction繼承自TransportAction,TransportAction中execute的實現
直接調用TransportIndexAction的doExecute
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index APIif (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {request.beforeLocalFork(); // we fork on another thread...createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {@Overridepublic void onResponse(CreateIndexResponse result) {innerExecute(request, listener);}@Overridepublic void onFailure(Throwable e) {if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {// we have the index, do ittry {innerExecute(request, listener);} catch (Throwable e1) {listener.onFailure(e1);}} else {listener.onFailure(e);}}});} else {innerExecute(request, listener);}}這里走 innerExecute(request, listener);
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {super.doExecute(request, listener); }這里的super就是TransportShardReplicationOperationAction了,TransportShardReplicationOperationAction中doExecute的實現
protected void doExecute(Request request, ActionListener<Response> listener) {new AsyncShardOperationAction(request, listener).start(); }主要兩個方法,一個是獲取shard,另一個是shardOperationOnPrimary;
其中shard后邊再說,shardOperationOnPrimary在TransportIndexAction實現
走request.opType() == IndexRequest.OpType.INDEX分支,主要是indexShard.prepareIndex,indexShard.index(index)這里IndexShard是InternalIndexShard,的index實現
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {writeAllowed(index.origin());index = indexingService.preIndex(index);try {if (logger.isTraceEnabled()) {logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());}engine.index(index);index.endTime(System.nanoTime());} catch (RuntimeException ex) {indexingService.failedIndex(index);throw ex;}indexingService.postIndex(index);return index.parsedDoc(); }indexingService對應ShardIndexingService, engine是InternalEngine,InternalEngine的index()
public void index(Index index) throws EngineException {final IndexWriter writer;try (InternalLock _ = readLock.acquire()) {writer = currentIndexWriter();try (Releasable r = throttle.acquireThrottle()) {innerIndex(index, writer);}dirty = true;possibleMergeNeeded = true;flushNeeded = true;} catch (OutOfMemoryError | IllegalStateException | IOException t) {maybeFailEngine(t, "index");throw new IndexFailedEngineException(shardId, index, t);}checkVersionMapRefresh(); }最終在InternalEngine的innerIndex方法中調用lunece的IndexWriter的,依據是不是存在有版本,來通過 writer.addDocuments或者updateDocument方法添加或者更新索引
添加add索引
更新update索引
if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); } else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer()); }最后Translog
Translog.Location translogLocation = translog.add(new Translog.Index(index));具體代碼
private void innerIndex(Index index, IndexWriter writer) throws IOException {synchronized (dirtyLock(index.uid())) {final long currentVersion;VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());if (versionValue == null) {currentVersion = loadCurrentVersionFromIndex(index.uid());} else {if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {currentVersion = Versions.NOT_FOUND; // deleted, and GC} else {currentVersion = versionValue.version();}}long updatedVersion;long expectedVersion = index.version();if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {if (index.origin() == Operation.Origin.RECOVERY) {return;} else {throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);}}updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);index.updateVersion(updatedVersion);if (currentVersion == Versions.NOT_FOUND) {// document does not exists, we can optimize for createindex.created(true);if (index.docs().size() > 1) {writer.addDocuments(index.docs(), index.analyzer());} else {writer.addDocument(index.docs().get(0), index.analyzer());}} else {if (versionValue != null) {index.created(versionValue.delete()); // we have a delete which is not GC'ed...}if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer());} else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());}}Translog.Location translogLocation = translog.add(new Translog.Index(index));versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));indexingService.postIndexUnderLock(index);} }link
分布式搜索Elasticsearch源碼分析之二------索引過程源碼概要分析
轉載于:https://www.cnblogs.com/donganwangshi/p/4318045.html
總結
以上是生活随笔為你收集整理的elasticsearch-1.3.0 之索引代码粗略梳理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 请问考研B区学校有哪些
- 下一篇: CodeForces 351A Jef