Kernel Memory 入门系列:异步管道
Kernel Memory 入門系列:異步管道
前面所介紹的處理流程都是基于同步管道的,即文檔導(dǎo)入的時候,會等到文檔處理完成之后才會返回。
但是在實際的應(yīng)用中,文檔很多,而且文檔的處理時間也不確定,如果采用同步的方式,那么就會導(dǎo)致整個流程的處理時間過長,也會導(dǎo)致整個流程的阻塞。因此,我們需要一種異步的方式來處理這種情況。
注冊消息隊列
當(dāng)我們使用異步管道的時候,需要先注冊消息隊列,Kernel Memory中默認(rèn)提供了幾種消息隊列的實現(xiàn),包括:
- RabbitMQ
- Azure Queue
- Simple Queue (file based, for testing)
這里以Simple Queue為例,在構(gòu)建Kernel Memory的時候,可以通過WithSimpleQueuesPipeline方法來注冊Simple Queue。
var _ = new KernelMemoryBuilder(appBuilder.Services)
//...
.WithSimpleQueuesPipeline() // <- register simple queue
.Build();
默認(rèn)注冊消息隊列之后,處理流程就會以后臺異步的方式進(jìn)行處理。
后臺任務(wù)
使用了異步隊列之后,自定義的處理流程就注冊方法就需要發(fā)生一些改變。
Kernel Memory提供的方式是將所有的異步處理流程都注冊為HostedService, 也就是后臺任務(wù)。
當(dāng)注冊自定義的處理流程的時候,就需要調(diào)用AddHandlerAsHostedService方法。
builder.Services.AddHandlerAsHostedService<MyHandler>("my_step");
注冊好的后臺任務(wù)會監(jiān)聽消息隊列,當(dāng)有消息到達(dá)的時候,就會觸發(fā)對應(yīng)的處理流程。
其余的導(dǎo)入、自定義處理流程和同步管道的方式一樣。
構(gòu)建異步處理服務(wù)
通過分布式的文件存儲、向量存儲和消息隊列服務(wù),就可以將文件的導(dǎo)入和處理流程進(jìn)行分離,從而實現(xiàn)異步的處理流程。
sequenceDiagram participant Client as client participant Memory as Kernel Memory participant ContentStorage as content Storage participant Queue as message queue participant HostedService as hosted service Client->>Memory: ImportDocumentAsync Memory->>ContentStorage: Save document Memory->>Queue: Send message Memory->>Client: Return documentId Queue->>HostedService: Receive message ContentStorage->>HostedService: Get document HostedService->>HostedService: Run pipeline Client->>Memory: GetDocumentStatusAsync ContentStorage->>Memory: Get document Status Memory->>Client: Return document status整體的處理流程如下:
- 文件導(dǎo)入后,會保存到分布式的文件存儲中,同時會發(fā)送消息到消息隊列中。
- 后臺任務(wù)會監(jiān)聽消息隊列,當(dāng)有消息到達(dá)的時候,就會觸發(fā)對應(yīng)的處理流程。
- 客戶端可以通過
GetDocumentStatusAsync方法來獲取文檔的處理狀態(tài)。
由此就可以實現(xiàn)異步的處理流程。
總結(jié)
以上是生活随笔為你收集整理的Kernel Memory 入门系列:异步管道的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用华为WS5200增强版路由器-华为52
- 下一篇: 路由器常用的四种桥接方式 如何连上桥接模