基于Java、Kafka、ElasticSearch的搜索框架的设计与实现
Jkes是一個基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驅動的JPA風格的對象/文檔映射,使用rest api用于文檔搜索。
項目主頁:https://github.com/chaokunyang/jkes
安裝
可以參考jkes-integration-test項目快速掌握jkes框架的使用方法。jkes-integration-test是我們用來測試功能完整性的一個Spring Boot Application。
安裝jkes-index-connector和jkes-delete-connector到Kafka Connect類路徑
安裝 Smart Chinese Analysis Plugin
配置
引入jkes-spring-data-jpa依賴
添加配置
提供JkesProperties Bean
這里可以很靈活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置
增加索引管理端點 因為我們不知道客戶端使用的哪種web技術,所以索引端點需要在客戶端添加。比如在Spring MVC中,可以按照如下方式添加索引端點
快速開始
索引API
使用com.timeyang.jkes.core.annotation包下相關注解標記實體
@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity {// @Id will be identified automatically// @Field(type = FieldType.Long)@Id@GeneratedValue(strategy = GenerationType.IDENTITY) ? ?private Long id; ? ?@MultiFields(mainField = @Field(type = FieldType.Text),otherFields = { ? ? ? ? ? ? ? ? ? ?@InnerField(suffix = "raw", type = FieldType.Keyword), ? ? ? ? ? ? ? ? ? ?@InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")}) ? ?private String name; ? ?@Field(type = FieldType.Keyword) ? ?private String gender; ? ?@Field(type = FieldType.Integer) ? ?private Integer age; ? ?// don't add @Field to test whether ignored// @Field(type = FieldType.Text)private String description; ? ?@Field(type = FieldType.Object) ? ?@ManyToOne(fetch = FetchType.EAGER) ? ?@JoinColumn(name = "group_id") ? ?private PersonGroup personGroup;}@lombok.Data@Entity@Document(type = "person_group", alias = "person_group_alias")public class PersonGroup extends AuditedEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY) ? ?private Long id; ? ?private String name; ? ?private String interests; ? ?@OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true) ? ?private List<Person> persons; ? ?private String description; ? ?@DocumentId@Field(type = FieldType.Long) ? ?public Long getId() { ? ? ? ?return id;} ? ?@MultiFields(mainField = @Field(type = FieldType.Text),otherFields = { ? ? ? ? ? ? ? ? ? ?@InnerField(suffix = "raw", type = FieldType.Keyword), ? ? ? ? ? ? ? ? ? ?@InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")}) ? ?public String getName() { ? ? ? ?return name;} ? ?@Field(type = FieldType.Text) ? ?public String getInterests() { ? ? ? ?return interests;} ? ?@Field(type = FieldType.Nested) ? ?public List<Person> getPersons() { ? ? ? ?return persons;} ? ?/*** 不加Field注解,測試序列化時是否忽略*/public String getDescription() { ? ? ? ?return description;} }當更新實體時,文檔會被自動索引到ElasticSearch;刪除實體時,文檔會自動從ElasticSearch刪除。
搜索API
啟動搜索服務jkes-search-service,搜索服務是一個Spring Boot Application,提供rest搜索api,默認運行在9000端口。
URI query
Nested query
match query
bool query
Source filtering
prefix
wildcard
regexp
Jkes工作原理
索引工作原理:
應用啟動時,Jkes掃描所有標注@Document注解的實體,為它們構建元數據。
基于構建的元數據,創建index和mappingJson格式的配置,然后通過ElasticSearch Java Rest Client將創建/更新index配置。
為每個文檔創建/更新Kafka ElasticSearch Connector,用于創建/更新文檔
為整個項目啟動/更新Jkes Deleter Connector,用于刪除文檔
攔截數據操作方法。將* save(*)方法返回的數據包裝為SaveEvent保存到EventContainer;使用(* delete*(..)方法的參數,生成一個DeleteEvent/DeleteAllEvent保存到EventContainer。
攔截事務。在事務提交后使用JkesKafkaProducer發送SaveEvent中的實體到Kafka,Kafka會使用我們提供的JkesJsonSerializer序列化指定的數據,然后發送到Kafka。
與SaveEvent不同,DeleteEvent會直接被序列化,然后發送到Kafka,而不是只發送一份數據
與SaveEvent和DeleteEvent不同,DeleteAllEvent不會發送數據到Kafka,而是直接通過ElasticSearch Java Rest Client刪除相應的index,然后重建該索引,重啟Kafka ElasticSearch Connector
查詢工作原理:
查詢服務通過rest api提供
我們沒有直接使用ElasticSearch進行查詢,因為我們需要在后續版本使用機器學習進行搜索排序,而直接與ElasticSearch進行耦合,會增加搜索排序API的接入難度
查詢服務是一個Spring Boot Application,使用docker打包為鏡像
查詢服務提供多版本API,用于API進化和兼容
查詢服務解析json請求,進行一些預處理后,使用ElasticSearch Java Rest Client轉發到ElasticSearch,將得到的響應進行解析,進一步處理后返回到客戶端。
為了便于客戶端人員開發,查詢服務提供了一個查詢UI界面,開發人員可以在這個頁面得到預期結果后再把json請求體復制到程序中。
流程圖
模塊介紹
jkes-core
jkes-core是整個jkes的核心部分。主要包括以下功能:
annotation包提供了jkes的核心注解
elasticsearch包封裝了elasticsearch相關的操作,如為所有的文檔創建/更新索引,更新mapping
kafka包提供了Kafka 生產者,Kafka Json Serializer,Kafka Connect Client
metadata包提供了核心的注解元數據的構建與結構化模型
event包提供了事件模型與容器
exception包提供了常見的Jkes異常
http包基于Apache Http Client封裝了常見的http json請求
support包暴露了Jkes核心配置支持
util包提供了一些工具類,便于開發。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils
jkes-boot
jkes-boot用于與一些第三方開源框架進行集成。
當前,我們通過jkes-spring-data-jpa,提供了與spring data jpa的集成。通過使用Spring的AOP機制,對Repository方法進行攔截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通過使用我們提供的SearchPlatformTransactionManager,對常用的事務管理器(如JpaTransactionManager)進行包裝,提供事務攔截功能。
在后續版本,我們會提供與更多框架的集成。
jkes-spring-data-jpa說明:
ContextSupport類用于從bean工廠獲取Repository Bean
@EnableJkes讓客戶端能夠輕松開啟Jkes的功能,提供了與Spring一致的配置模型
EventSupport處理事件的細節,在保存和刪除數據時生成相應事件存放到EventContainer,在事務提交和回滾時處理相應的事件
SearchPlatformTransactionManager包裝了客戶端的事務管理器,在事務提交和回滾時加入了回調hook
audit包提供了一個簡單的AuditedEntity父類,方便添加審計功能,版本信息可用于結合ElasticSearch的版本機制保證不會索引過期文檔數據
exception包封裝了常見異常
intercept包提供了AOP切點和切面
index包提供了全量索引功能。當前,我們提供了基于線程池的索引機制和基于ForkJoin的索引機制。在后續版本,我們會重構代碼,增加基于阻塞隊列的生產者-消費者模式,提供并發性能
jkes-services
jkes-services主要用來提供一些服務。 目前,jkes-services提供了以下服務:
jkes-delete-connector
jkes-delete-connector是一個Kafka Connector,用于從kafka集群獲取索引刪除事件(DeleteEvent),然后使用Jest Client刪除ElasticSearch中相應的文檔。
借助于Kafka Connect的rest admin api,我們輕松地實現了多租戶平臺上的文檔刪除功能。只要為每個項目啟動一個jkes-delete-connector,就可以自動處理該項目的文檔刪除工作。避免了每啟動一個新的項目,我們都得手動啟動一個Kafka Consumer來處理該項目的文檔刪除工作。盡管可以通過正則訂閱來減少這樣的工作,但是還是非常不靈活
jkes-search-service
jkes-search-service是一個restful的搜索服務,提供了多版本的rest query api。查詢服務提供多版本API,用于API進化和兼容
jkes-search-service目前支持URI風格的搜索和JSON請求體風格的搜索。
我們沒有直接使用ElasticSearch進行查詢,因為我們需要在后續版本使用機器學習進行搜索排序,而直接與ElasticSearch進行耦合,會增加搜索排序的接入難度
查詢服務是一個Spring Boot Application,使用docker打包為鏡像
查詢服務解析json請求,進行一些預處理后,使用ElasticSearch Java Rest Client轉發到ElasticSearch,將得到的響應進行解析,進一步處理后返回到客戶端。
為了便于客戶端人員開發,查詢服務提供了一個查詢UI界面,開發人員可以在這個頁面得到預期結果后再把json請求體復制到程序中。
后續,我們將會基于zookeeper構建索引集群,提供集群索引管理功能
jkes-integration-test
jkes-integration-test是一個基于Spring Boot集成測試項目,用于進行功能測試。同時測量一些常見操作的吞吐率
開發
To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.
Contribute
Source Code: https://github.com/chaokunyang/jkes
Issue Tracker: https://github.com/chaokunyang/jkes/issues
LICENSE
This project is licensed under Apache License 2.0.
總結
以上是生活随笔為你收集整理的基于Java、Kafka、ElasticSearch的搜索框架的设计与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mohro是哪个国家的钱
- 下一篇: java 寻找和为定值的多个数_算法笔记