日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

企业搜索引擎开发之连接器connector(二十七)

發布時間:2025/7/14 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 企业搜索引擎开发之连接器connector(二十七) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ChangeQueue類實現ChangeSource接口,聲明了拉取下一條Change對象的方法

* A source of {@link Change} objects.** @since 2.8*/ public interface ChangeSource {/*** @return the next change, or {@code null} if there is no change available*/public Change getNextChange(); }

在ChangeQueue類實例里面初始化阻塞隊列private final BlockingQueue<Change> pendingChanges,作為保存Change對象容器

/*** 初始化阻塞隊列pendingChanges* @param size* @param sleepInterval* @param introduceDelayAfterEachScan* @param activityLogger*/private ChangeQueue(int size, long sleepInterval, boolean introduceDelayAfterEachScan, CrawlActivityLogger activityLogger) {pendingChanges = new ArrayBlockingQueue<Change>(size);this.sleepInterval = sleepInterval;this.activityLogger = activityLogger;this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;}

參數introduceDelayAfterEveryScan設置在數據迭代完畢是否延時

上文中提到在其內部類CallBack中將提交的數據添加到阻塞隊列BlockingQueue<Change> pendingChanges之中

而在ChangeQueue實現ChangeSource接口的方法中,實現從阻塞隊列獲取Change對象

/*** 獲取阻塞隊列pendingChanges元素* Gets the next available change from the ChangeQueue. Will wait up to* 1/4 second for a change to appear if none is immediately available.** @return the next available change, or {@code null} if no changes are* available*/public Change getNextChange() {try {return pendingChanges.poll(250L, TimeUnit.MILLISECONDS);} catch (InterruptedException ie) {return null;}}

ChangeQueue對象作為保存Change對象的緩沖容器,上文中分析到Change對象是通過啟動監控器對象DocumentSnapshotRepositoryMonitor的線程方法添加進來的

那么,由哪個對象實現調用ChangeQueue對象的getNextChange()方法取出Change對象數據呢?

通過跟蹤CheckpointAndChangeQueue類的loadUpFromChangeSource方法調用了getNextChange()方法,在該方法里面將獲取的Chnage對象經過包裝為CheckpointAndChange類型對象后添加到成員屬性List<CheckpointAndChange> checkpointAndChangeList之中

先熟悉一下相關成員屬性和構造函數

private final AtomicInteger maximumQueueSize =new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);private final List<CheckpointAndChange> checkpointAndChangeList;private final ChangeSource changeSource;private final DocumentHandleFactory internalDocumentHandleFactory;private final DocumentHandleFactory clientDocumentHandleFactory;private volatile DiffingConnectorCheckpoint lastCheckpoint;private final File persistDir; // place to persist enqueued valuesprivate MonitorRestartState monitorPoints = new MonitorRestartState();public CheckpointAndChangeQueue(ChangeSource changeSource, File persistDir,DocumentHandleFactory internalDocumentHandleFactory,DocumentHandleFactory clientDocumentHandleFactory) {this.changeSource = changeSource;this.checkpointAndChangeList= Collections.synchronizedList(new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));this.persistDir = persistDir;this.internalDocumentHandleFactory = internalDocumentHandleFactory;this.clientDocumentHandleFactory = clientDocumentHandleFactory;ensurePersistDirExists();}

包括初始化ChangeSource類型對象changeSource(也即ChangeQueue類型對象)以及List容器List<CheckpointAndChange> checkpointAndChangeList

再來回顧loadUpFromChangeSource方法

/*** 從ChangeSource拉取Change,加入checkpointAndChangeList*/private void loadUpFromChangeSource() {int max = maximumQueueSize.get();if (checkpointAndChangeList.size() < max) {lastCheckpoint = lastCheckpoint.nextMajor();} while (checkpointAndChangeList.size() < max) {Change newChange = changeSource.getNextChange();if (newChange == null) {break;}lastCheckpoint = lastCheckpoint.next();checkpointAndChangeList.add(new CheckpointAndChange(lastCheckpoint, newChange)); }}

方法主要行為即從changeSource對象取出Change對象,然后經過包裝為CheckPointAndChange對象添加到?容器List<CheckpointAndChange> checkpointAndChangeList之中

在其resume方法里面調用了loadUpFromChangeSource方法(resume方法在DiffingConnectorDocumentList類的構造函數中調用)

/*** 獲取List<CheckpointAndChange>隊列* Returns an {@link Iterator} for currently available* {@link CheckpointAndChange} objects that occur after the passed in* checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}* passed in is produced by calling* {@link DiffingConnectorCheckpoint#toString()}. As a side effect, Objects* up to and including the object with the passed in checkpoint are removed* from this queue.** @param checkpointString null means return all {@link CheckpointAndChange}* objects and a non null value means to return* {@link CheckpointAndChange} objects with checkpoints after the* passed in value.* @throws IOException if error occurs while manipulating recovery state*/synchronized List<CheckpointAndChange> resume(String checkpointString)throws IOException {//移除已完成隊列 removeCompletedChanges(checkpointString);//從ChangeSource拉取Change,加入checkpointAndChangeList loadUpFromChangeSource();//更新monitorPoints monitorPoints.updateOnGuaranteed(checkpointAndChangeList);try {//持久化checkpointAndChangeList到隊列文件//一次resume即生成一文件 writeRecoveryState();} finally {// TODO: Enahnce with mechanism that remembers// information about recovery files to avoid re-reading.//移除冗余的隊列文件 (已經消費完成的) removeExcessRecoveryState();}return getList();}

在填充List<CheckpointAndChange> checkpointAndChangeList容器后,將其中的數據以json格式持久化到隊列文件?

/** * 持久化json隊列* @throws IOException*/private void writeRecoveryState() throws IOException {// TODO(pjo): Move this method into RecoveryFile.File recoveryFile = new RecoveryFile(persistDir);FileOutputStream outStream = new FileOutputStream(recoveryFile);Writer writer = new OutputStreamWriter(outStream, Charsets.UTF_8);try {try {writeJson(writer);} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed writing recovery file.", e);}writer.flush();outStream.getFD().sync();} finally {writer.close();}}

隊列文件命名包含了當前系統時間,用于比較文件創建的早晚

/** * 可用于比較時間的隊列文件* A File that has some of the recovery logic. * Original recovery files' names contained a single nanosecond timestamp,* eg. recovery.10220010065599398 . These turned out to be flawed* because nanosecond times can go "back in time" between JVM restarts.* Updated recovery files' names contain a wall clock millis timestamp * followed by an underscore followed by a nanotimestamp, eg.* recovery.702522216012_10220010065599398 .*/static class RecoveryFile extends File {final static long NO_TIME_AVAIL = -1;long milliTimestamp = NO_TIME_AVAIL;long nanoTimestamp;long parseTime(String s) throws IOException {try {return Long.parseLong(s);} catch(NumberFormatException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}/*** 解析文件名稱中包含的時間* @throws IOException*/void parseOutTimes() throws IOException {try {String basename = getName();if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());} else {String extension = basename.substring(RECOVERY_FILE_PREFIX.length());if (!extension.contains("_")) { // Original name format.nanoTimestamp = parseTime(extension);} else { // Updated name format.String timeParts[] = extension.split("_");if (2 != timeParts.length) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}milliTimestamp = parseTime(timeParts[0]);nanoTimestamp = parseTime(timeParts[1]);}}} catch(IndexOutOfBoundsException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}RecoveryFile(File persistanceDir) throws IOException {super(persistanceDir, RECOVERY_FILE_PREFIX + System.currentTimeMillis()+ "_" + System.nanoTime());parseOutTimes();}/*** 該構造函數用于先獲得文件絕對路徑* @param absolutePath* @throws IOException*/RecoveryFile(String absolutePath) throws IOException {super(absolutePath);parseOutTimes();}boolean isOlder(RecoveryFile other) {boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;boolean bothHaveMillis = weHaveMillis && otherHasMillis;boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);if (bothHaveMillis) {if (this.milliTimestamp < other.milliTimestamp) {return true;} else if (this.milliTimestamp > other.milliTimestamp) {return false;} else {return this.nanoTimestamp < other.nanoTimestamp;}} else if (neitherHasMillis) {return this.nanoTimestamp < other.nanoTimestamp;} else if (weHaveMillis) { // and other doesn't; we are newer.return false;} else { // other has millis; other is newer.return true;}}/** A delete method that logs failures. *//*** 刪除文件*/public void logOnFailDelete() {boolean deleted = super.delete();if (!deleted) {LOG.severe("Failed to delete: " + getAbsolutePath());}}// TODO(pjo): Move more recovery logic into this class.}

下面來看在其啟動方法(start方法)都做了什么

/*** Initialize to start processing from after the passed in checkpoint* or from the beginning if the passed in checkpoint is null. Part of* making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".*/public synchronized void start(String checkpointString) throws IOException {LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);//創建隊列目錄 ensurePersistDirExists();checkpointAndChangeList.clear();lastCheckpoint = constructLastCheckpoint(checkpointString);if (null == checkpointString) {//刪除隊列文件 removeAllRecoveryState();} else {RecoveryFile current = removeExcessRecoveryState();//加載monitorPoints和checkpointAndChangeList隊列 loadUpFromRecoveryState(current);//this.monitorPoints.points.entrySet(); }}

無非從原先保存的隊列文件中加載CheckPointAndChange對象列表到List<CheckpointAndChange> checkpointAndChangeList容器中(另外還包括MonitorCheckoint對象)

/*** 加載隊列* @param file* @throws IOException*/private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {// TODO(pjo): Move this method into RecoveryFile.new LoadingQueueReader().readJson(file);}

在CheckpointAndChangeQueue類中定義了內部類,即用于從json格式文件加載CheckPointAndChange對象列表到List<CheckpointAndChange> checkpointAndChangeList容器

抽象隊列讀取抽象類AbstractQueueReader

/*** 從json文件加載隊列抽象類* Reads JSON recovery files. Uses the Template Method pattern to* delegate what to do with the parsed objects to subclasses.** Note: This class uses gson for streaming support.*/private abstract class AbstractQueueReader {public void readJson(File file) throws IOException {readJson(new BufferedReader(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)));}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required. The given stream will be* closed automatically.** @param reader the stream to parse*/@VisibleForTestingvoid readJson(Reader reader) throws IOException {JsonReader jsonReader = new JsonReader(reader);try {readJson(jsonReader);} finally {jsonReader.close();}}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required.*/private void readJson(JsonReader reader) throws IOException {JsonParser parser = new JsonParser();reader.beginObject();while (reader.hasNext()) {String name = reader.nextName();if (name.equals(MONITOR_STATE_JSON_TAG)) {readMonitorPoints(parser.parse(reader));} else if (name.equals(QUEUE_JSON_TAG)) {reader.beginArray();while (reader.hasNext()) {readCheckpointAndChange(parser.parse(reader));}reader.endArray();} else {throw new IOException("Read invalid recovery file.");}}reader.endObject();reader.setLenient(true);String name = reader.nextString();if (!name.equals(SENTINAL)) {throw new IOException("Read invalid recovery file.");}}protected abstract void readMonitorPoints(JsonElement gson)throws IOException;protected abstract void readCheckpointAndChange(JsonElement gson)throws IOException;}

抽象方法由子類實現

/*** 檢測隊列文件的有效性* Verifies that a JSON recovery file is valid JSON with a* trailing sentinel.*/private class ValidatingQueueReader extends AbstractQueueReader {protected void readMonitorPoints(JsonElement gson) throws IOException {}protected void readCheckpointAndChange(JsonElement gson)throws IOException {}}/*** 從json文件加載隊列實現類*//** Loads the queue from a JSON recovery file. *//** TODO(jlacey): Change everything downstream to gson. For now, we* reserialize the individual gson objects and deserialize them* using org.json.*/@VisibleForTestingclass LoadingQueueReader extends AbstractQueueReader {/*** 加載MonitorRestartState checkpoint(HashMap<String, MonitorCheckpoint> points)*/protected void readMonitorPoints(JsonElement gson) throws IOException {try {JSONObject json = gsonToJson(gson);monitorPoints = new MonitorRestartState(json);//monitorPoints.updateOnGuaranteed(checkpointAndChangeList)} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}/*** 加載checkpointAndChangeList*/protected void readCheckpointAndChange(JsonElement gson)throws IOException {try {JSONObject json = gsonToJson(gson);checkpointAndChangeList.add(new CheckpointAndChange(json,internalDocumentHandleFactory, clientDocumentHandleFactory));} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}// TODO(jlacey): This could be much more efficient, especially// with LOBs, if we directly transformed the objects with a little// recursive parser. This code is only used when recovering failed// batches, so I don't know if that's worth the effort.private JSONObject gsonToJson(JsonElement gson) throws JSONException {return new JSONObject(gson.toString());}}

---------------------------------------------------------------------------

本系列企業搜索引擎開發之連接器connector系本人原創

轉載請注明出處 博客園 刺猬的溫馴

本人郵箱:?chenying998179@163#com (#改為.)

本文鏈接?http://www.cnblogs.com/chenying99/p/3789560.html?

轉載于:https://www.cnblogs.com/chenying99/p/3789560.html

總結

以上是生活随笔為你收集整理的企业搜索引擎开发之连接器connector(二十七)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。