日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

solr源码分析之数据导入DataImporter追溯。

發布時間:2025/4/5 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 solr源码分析之数据导入DataImporter追溯。 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  若要搜索的信息都是被存儲在數據庫里面的,但是solr不能直接搜數據庫,所以只有借助Solr組件將要搜索的信息在搜索服務器上進行索引,然后在客戶端供客戶使用。

1. SolrDispatchFilter

SolrDispatchFilter的作用:將請求的url映射到定義在solrconfig.xml中的處理器handler。

要處理的動作有:

enum Action {PASSTHROUGH, FORWARD, RETURN, RETRY, ADMIN, REMOTEQUERY, PROCESS}

PASSTHROUGH:通過webapp傳遞到Restlet。

FORWARD:跳轉重寫的url(沒有路徑前綴和核心/集合名稱)到Restlet。

RETURN:返回控制,不需要更多特定的處理,通常在設置錯誤并返回時產生。

RETRY:重試請求。當沒有發現工作的core時,設置此參數。

注:核心是指CoreContainer

SolrDispatchFilter間接繼承了javax.servlet.Filter,實現方法為doFilter():

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException {if (!(request instanceof HttpServletRequest)) return;AtomicReference<ServletRequest> wrappedRequest = new AtomicReference();if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been sentreturn;}if (wrappedRequest.get() != null) {request = wrappedRequest.get();}if (cores.getAuthenticationPlugin() != null) {log.debug("User principal: {}", ((HttpServletRequest)request).getUserPrincipal());}// No need to even create the HttpSolrCall object if this path is excluded.if(excludePatterns != null) {String servletPath = ((HttpServletRequest) request).getServletPath();for (Pattern p : excludePatterns) {Matcher matcher = p.matcher(servletPath);if (matcher.lookingAt()) {chain.doFilter(request, response);return;}}}HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);try { Action result = call.call();switch (result) {case PASSTHROUGH:chain.doFilter(request, response);break;case RETRY:doFilter(request, response, chain, true);break;case FORWARD:request.getRequestDispatcher(call.getPath()).forward(request, response);break;} } finally {call.destroy();}}

SolrDispatchFilter調用HttpSolrCall的call()方法來處理。

2. 調用HttpSolrCall處理請求

HttpSolrCall的構造函數:

public HttpSolrCall(SolrDispatchFilter solrDispatchFilter, CoreContainer cores,HttpServletRequest request, HttpServletResponse response, boolean retry) {this.solrDispatchFilter = solrDispatchFilter;this.cores = cores;this.req = request;this.response = response;this.retry = retry;this.requestType = RequestType.UNKNOWN;queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());}

在call方法中完整請求處理:

/*** This method processes the request.*/public Action call() throws IOException {MDCLoggingContext.reset();MDCLoggingContext.setNode(cores);if (cores == null) {sendError(503, "Server is shutting down or failed to initialize");return RETURN;}if (solrDispatchFilter.abortErrorMessage != null) {sendError(500, solrDispatchFilter.abortErrorMessage);return RETURN;}try {init();/* Authorize the request if1. Authorization is enabled, and2. The requested resource is not a known static file*/if (cores.getAuthorizationPlugin() != null) {AuthorizationContext context = getAuthCtx();log.info(context.toString());AuthorizationResponse authResponse = cores.getAuthorizationPlugin().authorize(context);if (!(authResponse.statusCode == HttpStatus.SC_ACCEPTED) && !(authResponse.statusCode == HttpStatus.SC_OK)) {sendError(authResponse.statusCode,"Unauthorized request, Response code: " + authResponse.statusCode);return RETURN;}}HttpServletResponse resp = response;switch (action) {case ADMIN:handleAdminRequest();return RETURN;case REMOTEQUERY:remoteQuery(coreUrl + path, resp);return RETURN;case PROCESS:final Method reqMethod = Method.getMethod(req.getMethod());HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);// unless we have been explicitly told not to, do cache validation// if we fail cache validation, execute the queryif (config.getHttpCachingConfig().isNever304() ||!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {SolrQueryResponse solrRsp = new SolrQueryResponse();/* even for HEAD requests, we need to execute the handler to* ensure we don't get an error (and to make sure the correct* QueryResponseWriter is selected and we get the correct* Content-Type)*/SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));execute(solrRsp);HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();while (headers.hasNext()) {Map.Entry<String, String> entry = headers.next();resp.addHeader(entry.getKey(), entry.getValue());}QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);writeResponse(solrRsp, responseWriter, reqMethod);}return RETURN;default: return action;}} catch (Throwable ex) {sendError(ex);// walk the the entire cause chain to search for an ErrorThrowable t = ex;while (t != null) {if (t instanceof Error) {if (t != ex) {SolrDispatchFilter.log.error("An Error was wrapped in another exception - please report complete stacktrace on SOLR-6161", ex);}throw (Error) t;}t = t.getCause();}return RETURN;} finally {MDCLoggingContext.clear();}}

3.獲取handler

RequestHandlerBase獲取handler:

/*** Get the request handler registered to a given name.** This function is thread safe.*/public static SolrRequestHandler getRequestHandler(String handlerName, PluginBag<SolrRequestHandler> reqHandlers) {if(handlerName == null) return null;SolrRequestHandler handler = reqHandlers.get(handlerName);int idx = 0;if(handler == null) {for (; ; ) {idx = handlerName.indexOf('/', idx+1);if (idx > 0) {String firstPart = handlerName.substring(0, idx);handler = reqHandlers.get(firstPart);if (handler == null) continue;if (handler instanceof NestedRequestHandler) {return ((NestedRequestHandler) handler).getSubHandler(handlerName.substring(idx));}} else {break;}}}return handler;}

4.處理請求handleRequest

RequestHandlerBase的handleRequest()方法處理請求:

public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {numRequests.incrementAndGet();TimerContext timer = requestTimes.time();try {if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);req.getContext().remove(USEPARAM);rsp.setHttpCaching(httpCaching); handleRequestBody( req, rsp );// count timeoutsNamedList header = rsp.getResponseHeader();if(header != null) {Object partialResults = header.get("partialResults");boolean timedOut = partialResults == null ? false : (Boolean)partialResults;if( timedOut ) {numTimeouts.incrementAndGet();rsp.setHttpCaching(false);}}} catch (Exception e) {if (e instanceof SolrException) {SolrException se = (SolrException)e;if (se.code() == SolrException.ErrorCode.CONFLICT.code) {// TODO: should we allow this to be counted as an error (numErrors++)? } else {SolrException.log(SolrCore.log,e);}} else {SolrException.log(SolrCore.log,e);if (e instanceof SyntaxError) {e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);}}rsp.setException(e);numErrors.incrementAndGet();}finally {timer.stop();}}

5.具體請求落到各個handler的handleRequestBody()方法,以DataImportHandler為例:

@Override@SuppressWarnings("unchecked")public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)throws Exception {rsp.setHttpCaching(false);//TODO: figure out why just the first one is OK...ContentStream contentStream = null;Iterable<ContentStream> streams = req.getContentStreams();if(streams != null){for (ContentStream stream : streams) {contentStream = stream;break;}}SolrParams params = req.getParams();NamedList defaultParams = (NamedList) initArgs.get("defaults");RequestInfo requestParams = new RequestInfo(req, getParamsMap(params), contentStream);String command = requestParams.getCommand();if (DataImporter.SHOW_CONF_CMD.equals(command)) { String dataConfigFile = params.get("config");String dataConfig = params.get("dataConfig");if(dataConfigFile != null) {dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));}if(dataConfig==null) {rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);} else {// Modify incoming request params to add wt=rawModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());rawParams.set(CommonParams.WT, "raw");req.setParams(rawParams);ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);rsp.add(RawResponseWriter.CONTENT, content);}return;}rsp.add("initArgs", initArgs);String message = "";if (command != null) {rsp.add("command", command);}// If importer is still nullif (importer == null) {rsp.add("status", DataImporter.MSG.NO_INIT);return;}if (command != null && DataImporter.ABORT_CMD.equals(command)) {importer.runCmd(requestParams, null);} else if (importer.isBusy()) {message = DataImporter.MSG.CMD_RUNNING;} else if (command != null) {if (DataImporter.FULL_IMPORT_CMD.equals(command)|| DataImporter.DELTA_IMPORT_CMD.equals(command) ||IMPORT_CMD.equals(command)) {importer.maybeReloadConfiguration(requestParams, defaultParams);UpdateRequestProcessorChain processorChain =req.getCore().getUpdateProcessorChain(params);UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);SolrResourceLoader loader = req.getCore().getResourceLoader();DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);if (requestParams.isDebug()) {if (debugEnabled) {// Synchronous request for the debug mode importer.runCmd(requestParams, sw);rsp.add("mode", "debug");rsp.add("documents", requestParams.getDebugInfo().debugDocuments);if (requestParams.getDebugInfo().debugVerboseOutput != null) {rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);}} else {message = DataImporter.MSG.DEBUG_NOT_ENABLED;}} else {// Asynchronous request for normal modeif(requestParams.getContentStream() == null && !requestParams.isSyncMode()){importer.runAsync(requestParams, sw);} else {importer.runCmd(requestParams, sw);}}} else if (DataImporter.RELOAD_CONF_CMD.equals(command)) { if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {message = DataImporter.MSG.CONFIG_RELOADED;} else {message = DataImporter.MSG.CONFIG_NOT_RELOADED;}}}rsp.add("status", importer.isBusy() ? "busy" : "idle");rsp.add("importResponse", message);rsp.add("statusMessages", importer.getStatusMessages());}

?6. 導入數據操作

分全量和增量:

void runCmd(RequestInfo reqParams, DIHWriter sw) {String command = reqParams.getCommand();if (command.equals(ABORT_CMD)) {if (docBuilder != null) {docBuilder.abort();}return;}if (!importLock.tryLock()){LOG.warn("Import command failed . another import is running"); return;}try {if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {doFullImport(sw, reqParams);} else if (command.equals(DELTA_IMPORT_CMD)) {doDeltaImport(sw, reqParams);}} finally {importLock.unlock();}}

以全量為例:

public void doFullImport(DIHWriter writer, RequestInfo requestParams) {LOG.info("Starting Full Import");setStatus(Status.RUNNING_FULL_DUMP);try {DIHProperties dihPropWriter = createPropertyWriter();setIndexStartTime(dihPropWriter.getCurrentTimestamp());docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);checkWritablePersistFile(writer, dihPropWriter);docBuilder.execute();if (!requestParams.isDebug())cumulativeStatistics.add(docBuilder.importStatistics);} catch (Exception e) {SolrException.log(LOG, "Full Import failed", e);docBuilder.handleError("Full Import failed", e);} finally {setStatus(Status.IDLE);DocBuilder.INSTANCE.set(null);}}

7.?EntityProcessorWrapper處理sql的實現類SqlEntityProcessor

@Overridepublic void init(Context context) {rowcache = null;this.context = context;resolver = (VariableResolver) context.getVariableResolver();if (entityName == null) {onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));if (onError == null) onError = ABORT;entityName = context.getEntityAttribute(ConfigNameConstants.NAME);}delegate.init(context);}

初始化時實現SqlEntityProcessor的初始化

public void init(Context context) {super.init(context);dataSource = context.getDataSource();}

contextImpl

@Overridepublic DataSource getDataSource() {if (ds != null) return ds;if(epw==null) { return null; }if (epw!=null && epw.getDatasource() == null) {epw.setDatasource(dataImporter.getDataSourceInstance(epw.getEntity(), epw.getEntity().getDataSourceName(), this));}if (epw!=null && epw.getDatasource() != null && docBuilder != null && docBuilder.verboseDebug &&Context.FULL_DUMP.equals(currentProcess())) {//debug is not yet implemented properly for deltas epw.setDatasource(docBuilder.getDebugLogger().wrapDs(epw.getDatasource()));}return epw.getDatasource();}

DataImporter獲取數據庫配置:

public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {Map<String,String> p = requestLevelDataSourceProps.get(name);if (p == null)p = config.getDataSources().get(name);if (p == null)p = requestLevelDataSourceProps.get(null);// for default data sourceif (p == null)p = config.getDataSources().get(null);if (p == null) throw new DataImportHandlerException(SEVERE,"No dataSource :" + name + " available for entity :" + key.getName());String type = p.get(TYPE);DataSource dataSrc = null;if (type == null) {dataSrc = new JdbcDataSource();} else {try {dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();} catch (Exception e) {wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);}}try {Properties copyProps = new Properties();copyProps.putAll(p);Map<String, Object> map = ctx.getRequestParameters();if (map.containsKey("rows")) {int rows = Integer.parseInt((String) map.get("rows"));if (map.containsKey("start")) {rows += Integer.parseInt((String) map.get("start"));}copyProps.setProperty("maxRows", String.valueOf(rows));}dataSrc.init(ctx, copyProps);} catch (Exception e) {wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());}return dataSrc;}

?

8.查詢結果

public ResultSetIterator(String query) {try {Connection c = getConnection();stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(batchSize);stmt.setMaxRows(maxRows);LOG.debug("Executing SQL: " + query);long start = System.nanoTime();if (stmt.execute(query)) {resultSet = stmt.getResultSet();}LOG.trace("Time taken for sql :"+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));colNames = readFieldNames(resultSet.getMetaData());} catch (Exception e) {wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);}if (resultSet == null) {rSetIterator = new ArrayList<Map<String, Object>>().iterator();return;}rSetIterator = new Iterator<Map<String, Object>>() {@Overridepublic boolean hasNext() {return hasnext();}@Overridepublic Map<String, Object> next() {return getARow();}@Overridepublic void remove() {/* do nothing */}};}

?

solr支持數據庫的全量和增量索引建立,上述代碼介紹了全量索引的來龍去脈,增量索引和全量索引雷同,就不贅述了。

?

轉載于:https://www.cnblogs.com/davidwang456/p/4754628.html

總結

以上是生活随笔為你收集整理的solr源码分析之数据导入DataImporter追溯。的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。