使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...
發(fā)布-訂閱消息系統(tǒng)在任何企業(yè)體系結(jié)構(gòu)中都起著重要作用,因?yàn)樗梢詫?shí)現(xiàn)可靠的集成而無需緊密耦合應(yīng)用程序。 在解耦的系統(tǒng)之間共享數(shù)據(jù)的能力并不是一個容易解決的問題。
考慮一個企業(yè),其中具有使用不同語言和平臺獨(dú)立構(gòu)建的多個應(yīng)用程序。 它需要以響應(yīng)方式共享數(shù)據(jù)和流程。 我們可以使用Messaging通過使用可自定義的格式頻繁,立即,可靠且異步地傳輸數(shù)據(jù)包來實(shí)現(xiàn)此目的。 從根本上說,異步消息傳遞是對分布式系統(tǒng)問題的務(wù)實(shí)反應(yīng)。 發(fā)送消息不需要兩個系統(tǒng)同時啟動和就緒。
發(fā)布-訂閱頻道
從簡單的角度來看,通過添加用于傳達(dá)事件通知的事件通道的概念,對這種模式的理解依賴于它對觀察者模式的擴(kuò)展。 觀察者模式描述了將觀察者與他們的主題脫鉤的必要性,這樣,無論有多少觀察者,主題都可以輕松地向所有感興趣的觀察者提供事件通知。
每個訂閱者都需要一次被通知特定事件,但不應(yīng)重復(fù)通知同一事件。 在通知所有訂戶之前,不能認(rèn)為事件已消耗。 但是,一旦所有訂戶都得到通知,該事件就可以視為已消耗,應(yīng)該從通道中消失[2]。
代理,隊(duì)列,主題和訂閱
代理消息傳遞支持真實(shí)時間解耦系統(tǒng)的場景,在這種情況下,不能保證消息生產(chǎn)者或使用者的可用性。 對于代理消息傳遞,隊(duì)列是保留由生產(chǎn)者創(chuàng)建的消息的代理,并且消費(fèi)者在準(zhǔn)備就緒時可以在其中檢索消息。
隊(duì)列提供了最簡單的郵件傳遞選項(xiàng)。 隊(duì)列中的消息是按照先進(jìn)先出(FIFO)進(jìn)行組織的,并且每個消息都應(yīng)由單個使用者處理。 但是,主題和訂閱構(gòu)成了一種發(fā)布/訂閱模式,允許N個消費(fèi)者處理同一條消息。
可以將單個消息添加到主題,并且對于滿足的每個訂閱規(guī)則,將消息的副本添加到該訂閱。 在這種情況下,每個訂閱都會成為隊(duì)列,消費(fèi)者可以在其中單獨(dú)處理訂閱上的消息。
行業(yè)領(lǐng)先者正在使用的可靠且成熟的項(xiàng)目之一是Apache Kafka,它為我們提供了每秒處理大量消息的能力,而不是傳統(tǒng)的消息傳遞系統(tǒng),該系統(tǒng)在傳統(tǒng)場景中非常有用,但效率和價值卻不高在處理大數(shù)據(jù)場景中。
除消息傳遞外,Apache Kafka還可以應(yīng)用于流處理,網(wǎng)站活動跟蹤,日志聚合,指標(biāo),基于時間的消息存儲,提交日志和事件源。 在下一節(jié)中,我們將深入介紹Apache Kafka的組件和特征。
卡夫卡
Kafka是一個分布式的發(fā)布-訂閱消息系統(tǒng),通過其設(shè)計(jì),分區(qū)和復(fù)制的提交日志服務(wù),其本質(zhì)上是快速,可伸縮的并且分布式的。 它與傳統(tǒng)消息傳遞系統(tǒng)的不同之處在于,它非常易于擴(kuò)展,提供高吞吐量,支持多訂戶并在故障期間自動平衡使用者,并具有允許實(shí)時應(yīng)用程序或ETL將其用作批處理消耗的能力磁盤上的持久消息數(shù)。
組件[1]
- 生產(chǎn)者 –生產(chǎn)者是將消息發(fā)布給Kafka經(jīng)紀(jì)人的任何應(yīng)用程序/程序。
- 使用者 -使用者是使用來自Kafka經(jīng)紀(jì)人的消息的應(yīng)用程序。 這些使用者可以是簡單的應(yīng)用程序,實(shí)時流處理引擎等。
- 主題和分區(qū) – Apache Kafka支持消息主題的概念,這些主題允許對消息進(jìn)行分類。 它使我們能夠?yàn)椴煌愋偷南?chuàng)建不同的主題,并讓不同的消費(fèi)者使用消息。 此外,Apache Kafka允許在一個Topic中創(chuàng)建多個分區(qū),以允許并行使用消息,因?yàn)槲覀兛梢酝瑫r從不同的分區(qū)中消費(fèi)不同的使用者。 每個分區(qū)都有一個領(lǐng)導(dǎo)節(jié)點(diǎn),負(fù)責(zé)處理來自消費(fèi)者/生產(chǎn)者對該分區(qū)的讀/寫請求。
- 代理 – Kafka代理通常是指安裝了Kafka的計(jì)算機(jī)。 但是,可以在非生產(chǎn)設(shè)置中在一臺計(jì)算機(jī)上設(shè)置多個代理。 Kafka經(jīng)紀(jì)人負(fù)責(zé)管理消息日志并接受生產(chǎn)者/消費(fèi)者的請求。 卡夫卡經(jīng)紀(jì)人是無國籍的。 這意味著消費(fèi)者必須保持已經(jīng)消費(fèi)了多少。 消費(fèi)者自己維護(hù)它,經(jīng)紀(jì)人不會做任何事情。
- 存儲 – Kafka具有非常簡單的存儲布局。 主題的每個分區(qū)都對應(yīng)一個邏輯日志。 從物理上講,日志是作為一組大小相等的段文件實(shí)現(xiàn)的。 每次生產(chǎn)者將消息發(fā)布到分區(qū)時,代理都將消息簡單地附加到最后一個段文件。 在發(fā)布了可配置數(shù)量的消息或經(jīng)過一定時間后,段文件將刷新到磁盤。 消息在清除后會暴露給使用者。
- 集群 – Kafka集群是Kafka經(jīng)紀(jì)人的集合。 集群中的所有Kafka經(jīng)紀(jì)人共同工作,以管理已配置的消息及其副本。
動物園管理員
ZooKeeper用于管理和協(xié)調(diào)Kafka經(jīng)紀(jì)人。 每個Kafka經(jīng)紀(jì)人都使用ZooKeeper與其他Kafka經(jīng)紀(jì)人進(jìn)行協(xié)調(diào)。 ZooKeeper服務(wù)會通知生產(chǎn)者和消費(fèi)者有關(guān)Kafka系統(tǒng)中是否存在新代理或代理失敗的信息。 從Zookeeper收到的有關(guān)經(jīng)紀(jì)人存在或失敗的通知中,生產(chǎn)者和消費(fèi)者做出決定并開始與其他經(jīng)紀(jì)人協(xié)調(diào)工作。 同樣,它負(fù)責(zé)為分區(qū)選擇新的領(lǐng)導(dǎo)者。
案例分析
稍作調(diào)整后,專注于練習(xí)。 因此,我們的案例研究使用Apache Kafka 2.3.1作為消息系統(tǒng),在發(fā)布-訂閱上下文中模擬了使用Spring Boot微框架v2.1.8.RELEASE構(gòu)建的兩個微服務(wù)之間的通信。 為了驗(yàn)證我們的研究,我們將設(shè)置并執(zhí)行一個集成測試,該測試重點(diǎn)在于使用JUnit 4/5測試框架在端到端場景中集成應(yīng)用程序的不同層。
生產(chǎn)者API是一個模塊,用于實(shí)現(xiàn)業(yè)務(wù)實(shí)體服務(wù)的操作,以協(xié)調(diào)和協(xié)調(diào)與企業(yè),機(jī)構(gòu)和實(shí)體組有關(guān)的經(jīng)濟(jì)信息。 消費(fèi)者API是同一解決方案中的另一個模塊,旨在集中所有業(yè)務(wù)實(shí)體統(tǒng)計(jì)信息,并接收來自不同來源的數(shù)據(jù)輸入。
為了簡單起見,API使用H2內(nèi)存數(shù)據(jù)庫。 項(xiàng)目結(jié)構(gòu)由三個模塊組成。 Producer和Consumer這兩個主要模塊都具有Common模塊的依賴性,在Common模塊中,它與系統(tǒng)的其余部分共享諸如錯誤處理和輔助類之類的內(nèi)容。
可從GitHub存儲庫訪問該示例; 要下載它,請點(diǎn)擊此鏈接 。
讓我們開始吧。
將Spring Kafka與Apache Kafka消息系統(tǒng)集成
用于Apache Kafka的Spring項(xiàng)目將核心Spring概念應(yīng)用于基于Kafka的消息傳遞解決方案的開發(fā)。 它提供了一個“模板”作為發(fā)送消息的高級抽象。 它還通過@KafkaListener批注和“偵聽器容器”為消息驅(qū)動的POJO提供支持。 這些庫促進(jìn)了依賴注入和聲明性[3]的使用。
生產(chǎn)者API
我們需要兩個步驟來配置生產(chǎn)者。 第一個是config類,其中定義生產(chǎn)者M(jìn)ap對象,生產(chǎn)者工廠和Kafka模板。 當(dāng)我們將消息生成器設(shè)置為在Kafka代理中發(fā)布時,第二種尊重服務(wù)類。
生產(chǎn)者配置
在配置類中,在application.properties中設(shè)置了常數(shù)“ bootstrapServers” (即Kafka服務(wù)器)。 使用@Value(“ $ {spring.kafka.bootstrap-servers}”)批注表示受影響參數(shù)的默認(rèn)值表達(dá)式。
要創(chuàng)建Kafka生產(chǎn)者,我們定義一些屬性,這些屬性將傳遞給Kafka生產(chǎn)者的構(gòu)造函數(shù)。 在“ producerconfigs ” @Bean中,我們將BOOTSTRAP_SERVERS_CONFIG屬性設(shè)置為我們先前在application.properties中定義的代理地址列表。 BOOTSTRAP_SERVERS_CONFIG值是主機(jī)/端口對的逗號分隔列表,生產(chǎn)者用來建立與Kafka群集的初始連接。
package com.BusinessEntityManagementSystem;import ...@Configuration public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<String, BusinessEntity> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, BusinessEntity> kafkaTemplate() {return new KafkaTemplate<String, BusinessEntity>(producerFactory());} }KEY_SERIALIZER_CLASS_CONFIG是用于Kafka記錄鍵的Kafka序列化器類,該類實(shí)現(xiàn)了Kafka序列化器接口。 注意,我們將其設(shè)置為StringSerializer.class作為消息ID。 VALUE_SERIALIZER_CLASS_CONFIG是一個Kafka序列化程序類,我們將其設(shè)置為JsonSerializer.class作為消息主體。
要創(chuàng)建消息,首先,我們需要配置一個ProducerFactory,該工廠設(shè)置創(chuàng)建Kafka Producer實(shí)例的策略。 然后,我們需要一個KafkaTemplate,它包裝一個Producer實(shí)例,并提供使用數(shù)據(jù)傳輸對象“ BusinessEntity ”將消息發(fā)送到Kafka主題的便捷方法。
生產(chǎn)者服務(wù)
在Kafka Producer服務(wù)類中, @ Service注釋表示帶注釋的類是“服務(wù)”。 在此類中,我們實(shí)現(xiàn)了將消息發(fā)送到Kafka代理的方法,在application.properties中預(yù)定義的標(biāo)頭上聲明了topic屬性。
package com.BusinessEntityManagementSystem.kafka;import ...@Service public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, BusinessEntity> kafkaTemplate;@Value("${statistics.kafka.topic}")String kafkaTopic;public void send(BusinessEntity payload) {Message<BusinessEntity> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, kafkaTopic).build();kafkaTemplate.send(message);} }消費(fèi)者API
在Consumer中,我們需要添加適當(dāng)?shù)腄eserializer,該Deserializer可以將JSON byte []轉(zhuǎn)換為Java Object。 要設(shè)置它,我們需要使用config和帶有@components注釋的類,當(dāng)使用基于注釋的配置和類路徑掃描時,它們將自動檢測此類以進(jìn)行依賴項(xiàng)注入。
消費(fèi)者配置
同樣,當(dāng)我們指定KEY_SERIALIZER_CLASS_CONFIG,VALUE_SERIALIZER_CLASS_CONFIG來序列化生產(chǎn)者發(fā)布的消息時,我們還需要通知Spring Kafka有關(guān)反序列化的常量值,例如KEY_DESERIALIZER_CLASS_CONFIG和VALUE_DESERIALIZER_CLASS_CONFIG。 除了上面引用的常量之外,我們還指定了GROUP_ID_CONFIG和AUTO_OFFSET_RESET_CONFIG作為最早的常量,從而允許使用者讀取代理中最后插入的消息。
要啟用Kafka偵聽器,我們使用@EnableKafka批注。 這注釋了由AbstractListenerContainerFactory在后臺創(chuàng)建的端點(diǎn)。 KafkaListenerContainerFactory負(fù)責(zé)為特定端點(diǎn)創(chuàng)建偵聽器容器。 它可以檢測容器中任何受Spring管理的bean上的KafkaListener批注。
作為典型的實(shí)現(xiàn), ConcurrentKafkaListenerContainerFactory提供了基礎(chǔ)MessageListenerContainer支持的必要配置選項(xiàng)。
package com.BusinessStatisticsUnitFiles;import ...@Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "statistics-BusinessStatisticsUnitFiles-group");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return props;}@Beanpublic ConsumerFactory<String, BusinessEntity> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(BusinessEntity.class, false));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, BusinessEntity> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, BusinessEntity> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;} }在消費(fèi)者工廠,我們可以禁用標(biāo)題的使用。 現(xiàn)在,這可以通過將新JsonDeserializer <>(BusinessEntity.class,false))中的第二個參數(shù)設(shè)置為false來實(shí)現(xiàn); 。 這使使用者可以信任來自任何程序包的消息。
消費(fèi)者“服務(wù)”
為了使用消息,有必要像上面一樣配置ConsumerFactory和KafkaListenerContainerFactory。 一旦這些bean在Spring bean工廠中可用,就可以使用@KafkaListener注釋配置基于POJO的使用者。
在用@KafkaListener注釋的類中, @ KafkaHandler還需要將方法標(biāo)記為Kafka消息偵聽器的目標(biāo)。 重要的是要了解,當(dāng)消息到達(dá)時,選擇的方法取決于有效負(fù)載類型。 該類型與單個非注釋參數(shù)匹配,或與@Payload注釋的參數(shù)匹配。 絕不能有歧義-系統(tǒng)必須能夠根據(jù)有效載荷類型選擇恰好一種方法。
package com.BusinessStatisticsUnitFiles.kafka;import ...@Component public class KafkaConsumer {@AutowiredIBusinessEntityRepository businessEntityRepository;private static final Logger LOG = LoggerFactory.getLogger(BusinessEntity.class);@KafkaListener(topics = "${statistics.kafka.topic.create.entity}", groupId = "statistics-BusinessEntityManagementSystem-group")@KafkaHandlerpublic void receiveCreatedEntity(@Payload BusinessEntity data,@Headers MessageHeaders headers) {businessEntityRepository.save(RetrieveConsumerFromReceivedProducerObject.Binding(new BusinessEntityModel(), data));} }@Payload批注將方法參數(shù)綁定到消息的有效負(fù)載。 它還可以用于將有效負(fù)載與方法調(diào)用相關(guān)聯(lián)。 有效負(fù)載可以通過MessageConverter傳遞,以將其從具有特定MIME類型的序列化形式轉(zhuǎn)換為與目標(biāo)方法參數(shù)匹配的Object。 用@Payload注釋的類是“ BusinessEntity” DTO。
Spring Boot還支持使用偵聽器中的@Headers批注檢索一個或多個消息頭。 可以為一個主題實(shí)現(xiàn)多個偵聽器,每個偵聽器具有不同的組ID。 此外,一個消費(fèi)者可以收聽來自各種主題的消息。
您可能已經(jīng)注意到,我們創(chuàng)建的主題建筑只有一個分區(qū)。 但是,對于具有多個分區(qū)的主題, @ KafkaListener可以顯式訂閱具有初始偏移量的主題的特定分區(qū)。
Application.properties
最后但并非最不重要的一點(diǎn)是,在我們的配置中,我們指定一些與生產(chǎn)者和消費(fèi)者之間的通信行為有關(guān)的值。
生產(chǎn)者/消費(fèi)者
在每個Producer和Consumer API上,我們都使用spring.kafka.bootstrap-servers = localhost:9092定義了我們希望微服務(wù)連接的Kafka集群。 同樣,有必要定義主題名稱以產(chǎn)生和接收消息,密鑰以及組ID。
... ## Application.properties Kafka config spring.kafka.bootstrap-servers=localhost:9092 statistics.kafka.topic=test statistics.kafka.key=test statistics.kafka.topic.create.entity=test spring.kafka.producer.group-id=statistics-BusinessStatisticsUnitFiles-group spring.kafka.template.default-topic=test ...準(zhǔn)備Kafka和Zookeeper進(jìn)行集成測試
下面定義的步驟演示了如何在Windows 10操作系統(tǒng)上運(yùn)行和測試Kafka。
下載帶有嵌入式Zookeeper的Kafka
設(shè)置zookeeper.properties
為了使其正常工作,我們需要更改Zookeeper數(shù)據(jù)目錄的位置。
打開kafka \ config \ zookeeper.properties文件,然后將Zookeeper數(shù)據(jù)/ log目錄位置配置更改為有效的Windows目錄位置。
設(shè)置server.properties
我們還需要對Kafka配置進(jìn)行一些更改。 打開kafka \ config \ server.properties并將主題默認(rèn)設(shè)置為1。 我們將運(yùn)行一個單節(jié)點(diǎn)Kafka。 另外,為防止Kafka創(chuàng)建不必要的偏移量,我們將副本指定為1。我們在Windows環(huán)境中使用最新的Kafka 2.3.1版本遇到此問題。 這導(dǎo)致Kafka停止,因?yàn)閮?nèi)存不足,無法處理在啟動服務(wù)器的初始階段自動創(chuàng)建的大量數(shù)據(jù)。
############################# Log Basics #############################log.dirs=C:\\kafka\\kafka-logs####################### Internal Topic Settings #####################offsets.topic.replication.factor=1 offsets.topic.num.partitions = 1 min.insync.replicas=1 default.replication.factor = 1 ...要完成Kafka配置,請將Kafka bin \ windows目錄添加到PATH環(huán)境變量中。
創(chuàng)建和執(zhí)行集成測試
顧名思義,集成測試專注于集成應(yīng)用程序的不同層,其中不涉及任何模擬。 集成測試需要啟動一個容器來執(zhí)行測試用例。 因此,為此需要一些額外的設(shè)置,但是使用spring boot時,使用一些注釋和庫可以很容易地完成這些步驟。
測試班
第一個注釋@RunWith(SpringRunner.class)用于在Spring Boot測試功能和JUnit之間建立橋梁。 SpringRunner.class在測試中全面支持Spring上下文加載和Bean的依賴項(xiàng)注入。 @SpringBootTest通過SpringApplication創(chuàng)建ApplicationContext測試,這些測試將在我們的測試中使用。 自嵌入式服務(wù)器以來,它將引導(dǎo)整個容器,并創(chuàng)建一個Web環(huán)境。
在我們的測試中,我們模仿的是真實(shí)的Web環(huán)境,將其設(shè)置為RANDOM_PORT也會加載WebServerApplicationContext。 嵌入式服務(wù)器將啟動并在隨機(jī)端口上進(jìn)行偵聽。
@RunWith(SpringRunner.class) @SpringBootTest(classes = {BusinessEntityManagementApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class BusinessEntityIntegrationTest {@LocalServerPortprivate int port;@AutowiredTestRestTemplate restTemplate;HttpHeaders headers = new HttpHeaders();@LocalServerPort批注為我們提供了在運(yùn)行時分配的注入的HTTP端口。 這是@Value("${local.server.port}")的便捷替代方法。
要訪問Spring應(yīng)用程序中的第三方REST服務(wù),我們使用Spring RestTemplate或TestRestTemplate ,這是一種適合集成測試的便捷替代方法,方法是將其注入我們的測試類中。 通過在項(xiàng)目中使用spring-boot-starter-test依賴項(xiàng),我們可以在運(yùn)行時訪問“ TestRestTemplate”類。
測試方法
在我們的方法測試中,我們使用“ junit-json-params ”,這是一個Junit 5庫,提供注釋以從JSON字符串或參數(shù)化測試中的文件加載數(shù)據(jù)。 我們還使用@ParameterizedTest注釋對方法進(jìn)行了注釋,以補(bǔ)充下面的庫。 它用于表示帶注釋的方法是參數(shù)化測試方法。 該方法不得為私有或靜態(tài)。 他們還必須通過@ArgumentsSource或相應(yīng)的組合批注指定至少一個ArgumentsProvider 。
我們的@ArgumentsSource是@ArgumentsSource中的JSON文件@JsonFileSource(resources =“ /business-entity-test-param.json”)。 @JsonFileSource允許您使用類路徑中的JSON文件。 它支持單個對象,對象數(shù)組和JSON原語。
從文件中檢索的JSON對象綁定到方法參數(shù)“對象”,該方法將其轉(zhuǎn)換為POJO對象,在本例中為我們的實(shí)體模型。
@ParameterizedTest @JsonFileSource(resources = "/business-entity-test-param.json") @DisplayName("create business entity with json parameter") void createBusinessEntity(JsonObject object) throws IOException, URISyntaxException {BusinessEntityModel businessEntityModel;businessEntityModel = new BusinessEntityModel();ObjectMapper mapper = new ObjectMapper();businessEntityModel = mapper.readValue(object.toString(), BusinessEntityModel.class);HttpEntity<BusinessEntityModel> request = new HttpEntity<>(businessEntityModel, headers);try {ResponseEntity<String> response = this.restTemplate.postForEntity(createURLWithPort("/api/businessEntityManagementSystem/v1/businessEntity"), request, String.class);assertAll(() -> assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.CREATED.value()),() -> assertThat(response.getHeaders().getLocation().getPath()).contains("/v1"));}catch(HttpClientErrorException ex) {assertAll(() -> Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), ex.getRawStatusCode()),() -> Assert.assertEquals(true, ex.getResponseBodyAsString().contains("Missing request header")));} }在安排并執(zhí)行操作之后,我們斷言對其余API的調(diào)用是否返回了所需的結(jié)果。
運(yùn)行集成測試
在我們的開發(fā)環(huán)境中,我們需要確保我們的Kafka和Zookeeper在兩個不同的控制臺中啟動并運(yùn)行,如圖所示
Kafka需要Zookeeper,因此我們將首先使用以下命令啟動Zookeeper。
c:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties它應(yīng)該啟動Zookeeper服務(wù)器。 最小化命令窗口,并讓Zookeeper在該窗口中運(yùn)行。 啟動一個新的命令窗口,并使用以下命令啟動Kafka Broker。
c:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties接下來,我們將按照我們的想法運(yùn)行Consumer API,或者將其部署在任何兼容的Web服務(wù)器中。
最后,我們可以將測試類作為JUnit測試執(zhí)行。 它將按正常方式啟動服務(wù)器并部署API。 然后它將執(zhí)行測試。 您可以在“ JUnit”選項(xiàng)卡中驗(yàn)證測試。
結(jié)論
在本文中,我們看到了如何使用發(fā)布-訂閱模式以可響應(yīng)的方式在兩個不同的微服務(wù)之間使用可定制格式頻繁,立即,可靠和異步地共享數(shù)據(jù),并通過在不同層中的不同層進(jìn)行集成測試來對其進(jìn)行驗(yàn)證。端到端場景。
參考文獻(xiàn)
[1] Kafka 2.3文檔 ;
[2] Gregor Hohpe,Bobby Woolf,企業(yè)集成模式設(shè)計(jì),構(gòu)建和部署消息解決方案,2003年;
[3] 適用于Apache Kafka 2.3.3的Spring 。
翻譯自: https://www.javacodegeeks.com/2019/11/microservices-in-publish-subscribe-communication-using-apache-kafka-as-a-messaging-systems-and-validated-through-integration-test.html
總結(jié)
以上是生活随笔為你收集整理的使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 住建云备案流程(住建云备案)
- 下一篇: 通过通用数据访问扩展AWS生态系统