javascript
从响应式Spring Data存储库流式传输实时更新
這篇文章詳細介紹了從數據庫到對該數據感興趣的任何其他組件進行流更新的幼稚實現。 更確切地說,如何更改Spring Data R2DBC存儲庫以向相關訂閱者發出事件。
對R2DBC和Spring的一點背景知識將對這篇文章有所幫助。 我以前的著作“ 使用 Microsoft SQL Server的 Spring Data R2DBC和Spring Data R2DBC進行 異步RDBMS訪問”在這方面應該有所幫助。
如前所述,這將是一個幼稚的實現。 因此,代碼將不會花哨。
為此,我劫持了SimpleR2dbcRepository以創建一個存儲庫實現,該存儲庫實現在每次保存新記錄時都會發出事件。 新事件將添加到DirectProcessor ,并發送到訂閱它的任何Publisher 。 看起來像:
class PersonRepository(entity: RelationalEntityInformation<Person, Int>,databaseClient: DatabaseClient,converter: R2dbcConverter,accessStrategy: ReactiveDataAccessStrategy ) : SimpleR2dbcRepository<Person, Int>(entity, databaseClient, converter, accessStrategy) {private val source: DirectProcessor<Person> = DirectProcessor.create<Person>()val events: Flux<Person> = sourceoverride fun <S : Person> save(objectToSave: S): Mono<S> {return super.save(objectToSave).doOnNext(source::onNext)} }來自SimpleR2dbcRepository唯一需要重寫的函數是save ( saveAll委托來save )。 doOnNext添加到原始保存調用中,該調用通過調用onNext將新事件推送到source ( DirectorProcessor )。
source被強制轉換為Flux以防止來自存儲庫外部的類添加新事件。 從技術上講,他們仍然可以添加事件,但是他們需要自己進行轉換。
您可能已經注意到,存儲庫正在加載參數并將其傳遞到SimpleR2dbcRepository 。 存儲庫的一個實例需要手動創建,因為其某些依賴項無法自動注入:
@Configuration class RepositoryConfiguration {@Beanfun personRepository(databaseClient: DatabaseClient,dataAccessStrategy: ReactiveDataAccessStrategy): PersonRepository {val entity: RelationalPersistentEntity<Person> = dataAccessStrategy.converter.mappingContext.getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity<Person>val relationEntityInformation: MappingRelationalEntityInformation<Person, Int> =MappingRelationalEntityInformation(entity, Int::class.java)return PersonRepository(relationEntityInformation,databaseClient,dataAccessStrategy.converter,dataAccessStrategy)} }至此,一切都準備就緒,可以使用了。 以下是其工作的示例:
personRepository.events.doOnComplete { log.info("Events flux has closed") }.subscribe { log.info("From events stream - $it") } // insert people records over time MARVEL_CHARACTERS.toFlux().delayElements(Duration.of(1, SECONDS)).concatMap { personRepository.save(it) }.subscribe()哪個輸出:
29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18) 29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48) 29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000) 29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49) 29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49) 29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34) 29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38) 29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100) 29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50) 29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26) 29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101) 29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42) 29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42) 29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29) 29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4) 29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47) 29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44) 29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59) 29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30) 29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49) 29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)每秒保存一條記錄,該記錄與從存儲庫發出的事件相匹配。
請注意, doOnComplete事件永遠不會觸發。 源永遠不會關閉,因此永遠不會向其任何訂戶發出完成事件。
至少對于此基本實現而言,這就是全部。 我敢肯定還有很多事情可以做,但是我首先需要弄清楚該怎么做……總結一下,通過添加一些內容,您可以將插入數據庫的數據流式傳輸到對記錄感興趣的組件被添加。
翻譯自: https://www.javacodegeeks.com/2019/09/streaming-live-updates-reactive-spring-data-repository.html
總結
以上是生活随笔為你收集整理的从响应式Spring Data存储库流式传输实时更新的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux搭建php运行环境(linux
- 下一篇: 操作方法:具有多个Mongo存储库和Ko