Kernel Memory 入门系列:自定义处理流程
Kernel Memory 入門系列:自定義處理流程
在整個文檔預處理的流程中,涉及到很多的處理步驟,例如:文本提取,文本分片,向量化和存儲。這些步驟是Kernel Memory中的默認提供的處理方法,如果有一些其他的需求,也可以進行過程的自定義。
自定義Handler
在Kernel Memory中,可以通過自定義Handler的方式來實現自定義的處理流程。自定義Handler需要實現IPipelineStepHandler接口,該接口定義如下:
public interface IPipelineStepHandler
{
string StepName { get; }
Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default);
}
其中,StepName是自定義Handler的名稱,用于在Pipeline中指定該步驟,InvokeAsync方法是自定義Handler的執行方法。在InvokeAsync方法中,可以對DataPipeline中的數據進行修改,從而實現自定義的處理。
主要的實現邏輯在InvokeAsync方法中,其中DataPipeline是主要的數據結構,它包含了整個文檔處理的流程中的所有數據。
如果想要得到當前處理流程中的文件,可以通過DataPipeline.Files屬性獲取。
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
foreach (DataPipeline.FileDetails file in pipeline.Files)
{
Console.WriteLine(file.Name);
}
return (true, pipeline);
}
實現的過程中建議為Handler注入IPipelineOrchestrator, 通過IPipelineOrchestrator可以獲取到當前的Memory的大部分基礎組件和文件管理的方法。
例如,如果想獲取文件的內容,就可以IPipelineOrchestrator.ReadTextFileAsync方法:
IPipelineOrchestrator orchestrator;
var fileContent = await orchestrator.ReadTextFileAsync(pipeline, file.Name, cancellationToken);
如果想要存儲生成的文件內容,就可以使用IPipelineOrchestrator.WriteTextFileAsync方法:
IPipelineOrchestrator orchestrator;
await orchestrator.WriteTextFileAsync(pipeline, file.Name, fileContent, cancellationToken);
除了文本內容,還可以通過IPipelineOrchestrator.ReadFileAsync和IPipelineOrchestrator.WriteFileAsync方法來讀取和存儲二進制文件。
除此之外,還可以通過IPipelineOrchestrator 獲取 TextGenerator 、EmbeddingGenerators、MemoryDbs等基礎組件,搭配使用實現更多豐富的流程。
例如使用TextGenerator文本生成服務,可以構建自己的提示詞方法,為當前的文檔生成摘要、提煉關鍵詞等。
生成的文本,首先通過IPipelineOrchestrator.WriteFileAsync進行內容的存儲, 然后將文件信息存放到file.GeneratedFiles中,這樣就可以在后續的處理流程中使用了。
var generatedFile = ...;
await orchestrator.WriteFileAsync(pipeline, generatedFile.Name, generatedFile.Content, cancellationToken);
file.GeneratedFiles.Add(new DataPipeline.GeneratedFileDetails
{
Id = Guid.NewGuid().ToString("N"),
ParentId = file.Id,
Name = generatedFile.Name,
Size = generatedFile.Length,
MimeType = generatedFile.Type,
ArtifactType = DataPipeline.ArtifactTypes.SyntheticData,
Tags = pipeline.Tags,
});
另外,其中的File本身存在一組方法,可以用來判斷該文件是否已經被當前流程處理過了,以避免重復處理:
file.MarkProcessedBy(this); // 標記當前文件已經被當前Handler處理過了
file.AlreadyProcessedBy(this); // 判斷當前文件是否已經被當前Handler處理過了
注冊Handler
完成Handler邏輯的編寫后,就可以將Handler注冊到Memory中進行使用了。
在構建Memory后,通過AddHandler方法即可完成注冊:
var memory = new KernelMemoryBuilder()
// ...
.Build<MemoryServerless>();
memory.AddHandler(new MyHandler(memory.Orchestrator));
另外也可以在 MemoryBuilder 階段,針對Orchestrator進行Handler的注冊:
var memoryBuilder = new KernelMemoryBuilder();
var orchestrator = memoryBuilder.GetOrchestrator();
var myHandler = new MyHandler(orchestrator);
await orchestrator.AddHandlerAsync(myHandler);
自定義處理流程
注冊完成Handler后,就可以在自定義的Pipeline中使用了。
一種方式是在memory.ImportDocumentAsync的時候,指定 Steps:
await memory.ImportDocumentAsync("sample-Wikipedia-Moon.txt", steps: new[] { "my_step" });
另一種是圍繞著Orchestrator進行Pipeline的構建:
var pipeline = orchestrator
.PrepareNewDocumentUpload(index: "tests", documentId: "inProcessTest", new TagCollection { { "testName", "example3" } })
.AddUploadFile("file1", "file1-Wikipedia-Carbon.txt", "file1-Wikipedia-Carbon.txt")
.AddUploadFile("file2", "file2-Wikipedia-Moon.txt", "file2-Wikipedia-Moon.txt")
.Then("extract")
.Then("partition")
.Then("summarize")
.Then("gen_embeddings")
.Then("save_records")
.Build();
await orchestrator.RunPipelineAsync(pipeline, cancellationToken);
以上就完成了自定義流程的實現。
參考
- InProcessMemoryWithCustomHandler
- ServerlessCustomPipeline
總結
以上是生活随笔為你收集整理的Kernel Memory 入门系列:自定义处理流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 兵线的移动速度
- 下一篇: 服务网格 Service Mesh