日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

發布時間:2023/12/3 windows 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

發布-訂閱消息系統在任何企業體系結構中都起著重要作用,因為它可以實現可靠的集成而無需緊密耦合應用程序。 在解耦的系統之間共享數據的能力并不是一個容易解決的問題。

考慮一個企業,其中具有使用不同語言和平臺獨立構建的多個應用程序。 它需要以響應方式共享數據和流程。 我們可以使用Messaging通過使用可自定義的格式頻繁,立即,可靠且異步地傳輸數據包來實現此目的。 從根本上說,異步消息傳遞是對分布式系統問題的務實反應。 發送消息不需要兩個系統同時啟動和就緒。

發布-訂閱頻道

從簡單的角度來看,通過添加用于傳達事件通知的事件通道的概念,對這種模式的理解依賴于它對觀察者模式的擴展。 觀察者模式描述了將觀察者與他們的主題脫鉤的必要性,這樣,無論有多少觀察者,主題都可以輕松地向所有感興趣的觀察者提供事件通知。

每個訂閱者都需要一次被通知特定事件,但不應重復通知同一事件。 在通知所有訂戶之前,不能認為事件已消耗。 但是,一旦所有訂戶都得到通知,該事件就可以視為已消耗,應該從通道中消失[2]。

代理,隊列,主題和訂閱

代理消息傳遞支持真實時間解耦系統的場景,在這種情況下,不能保證消息生產者或使用者的可用性。 對于代理消息傳遞,隊列是保留由生產者創建的消息的代理,并且消費者在準備就緒時可以在其中檢索消息。

隊列提供了最簡單的郵件傳遞選項。 隊列中的消息是按照先進先出(FIFO)進行組織的,并且每個消息都應由單個使用者處理。 但是,主題和訂閱構成了一種發布/訂閱模式,允許N個消費者處理同一條消息。

可以將單個消息添加到主題,并且對于滿足的每個訂閱規則,將消息的副本添加到該訂閱。 在這種情況下,每個訂閱都會成為隊列,消費者可以在其中單獨處理訂閱上的消息。

行業領先者正在使用的可靠且成熟的項目之一是Apache Kafka,它為我們提供了每秒處理大量消息的能力,而不是傳統的消息傳遞系統,該系統在傳統場景中非常有用,但效率和價值卻不高在處理大數據場景中。

除消息傳遞外,Apache Kafka還可以應用于流處理,網站活動跟蹤,日志聚合,指標,基于時間的消息存儲,提交日志和事件源。 在下一節中,我們將深入介紹Apache Kafka的組件和特征。

卡夫卡

Kafka是一個分布式的發布-訂閱消息系統,通過其設計,分區和復制的提交日志服務,其本質上是快速,可伸縮的并且分布式的。 它與傳統消息傳遞系統的不同之處在于,它非常易于擴展,提供高吞吐量,支持多訂戶并在故障期間自動平衡使用者,并具有允許實時應用程序或ETL將其用作批處理消耗的能力磁盤上的持久消息數。

組件[1]

  • 生產者 –生產者是將消息發布給Kafka經紀人的任何應用程序/程序。
  • 使用者 -使用者是使用來自Kafka經紀人的消息的應用程序。 這些使用者可以是簡單的應用程序,實時流處理引擎等。
  • 主題和分區 – Apache Kafka支持消息主題的概念,這些主題允許對消息進行分類。 它使我們能夠為不同類型的消息創建不同的主題,并讓不同的消費者使用消息。 此外,Apache Kafka允許在一個Topic中創建多個分區,以允許并行使用消息,因為我們可以同時從不同的分區中消費不同的使用者。 每個分區都有一個領導節點,負責處理來自消費者/生產者對該分區的讀/寫請求。
  • 代理 – Kafka代理通常是指安裝了Kafka的計算機。 但是,可以在非生產設置中在一臺計算機上設置多個代理。 Kafka經紀人負責管理消息日志并接受生產者/消費者的請求。 卡夫卡經紀人是無國籍的。 這意味著消費者必須保持已經消費了多少。 消費者自己維護它,經紀人不會做任何事情。
  • 存儲 – Kafka具有非常簡單的存儲布局。 主題的每個分區都對應一個邏輯日志。 從物理上講,日志是作為一組大小相等的段文件實現的。 每次生產者將消息發布到分區時,代理都將消息簡單地附加到最后一個段文件。 在發布了可配置數量的消息或經過一定時間后,段文件將刷新到磁盤。 消息在清除后會暴露給使用者。
  • 集群 – Kafka集群是Kafka經紀人的集合。 集群中的所有Kafka經紀人共同工作,以管理已配置的消息及其副本。

動物園管理員

ZooKeeper用于管理和協調Kafka經紀人。 每個Kafka經紀人都使用ZooKeeper與其他Kafka經紀人進行協調。 ZooKeeper服務會通知生產者和消費者有關Kafka系統中是否存在新代理或代理失敗的信息。 從Zookeeper收到的有關經紀人存在或失敗的通知中,生產者和消費者做出決定并開始與其他經紀人協調工作。 同樣,它負責為分區選擇新的領導者。

案例分析

稍作調整后,專注于練習。 因此,我們的案例研究使用Apache Kafka 2.3.1作為消息系統,在發布-訂閱上下文中模擬了使用Spring Boot微框架v2.1.8.RELEASE構建的兩個微服務之間的通信。 為了驗證我們的研究,我們將設置并執行一個集成測試,該測試重點在于使用JUnit 4/5測試框架在端到端場景中集成應用程序的不同層。

生產者API是一個模塊,用于實現業務實體服務的操作,以協調和協調與企業,機構和實體組有關的經濟信息。 消費者API是同一解決方案中的另一個模塊,旨在集中所有業務實體統計信息,并接收來自不同來源的數據輸入。

為了簡單起見,API使用H2內存數據庫。 項目結構由三個模塊組成。 Producer和Consumer這兩個主要模塊都具有Common模塊的依賴性,在Common模塊中,它與系統的其余部分共享諸如錯誤處理和輔助類之類的內容。

可從GitHub存儲庫訪問該示例; 要下載它,請點擊此鏈接 。

讓我們開始吧。

將Spring Kafka與Apache Kafka消息系統集成

用于Apache Kafka的Spring項目將核心Spring概念應用于基于Kafka的消息傳遞解決方案的開發。 它提供了一個“模板”作為發送消息的高級抽象。 它還通過@KafkaListener批注和“偵聽器容器”為消息驅動的POJO提供支持。 這些庫促進了依賴注入和聲明性[3]的使用。

生產者API

我們需要兩個步驟來配置生產者。 第一個是config類,其中定義生產者Map對象,生產者工廠和Kafka模板。 當我們將消息生成器設置為在Kafka代理中發布時,第二種尊重服務類。

生產者配置

在配置類中,在application.properties中設置了常數“ bootstrapServers” (即Kafka服務器)。 使用@Value(“ $ {spring.kafka.bootstrap-servers}”)批注表示受影響參數的默認值表達式。

要創建Kafka生產者,我們定義一些屬性,這些屬性將傳遞給Kafka生產者的構造函數。 在“ producerconfigs ” @Bean中,我們將BOOTSTRAP_SERVERS_CONFIG屬性設置為我們先前在application.properties中定義的代理地址列表。 BOOTSTRAP_SERVERS_CONFIG值是主機/端口對的逗號分隔列表,生產者用來建立與Kafka群集的初始連接。

package com.BusinessEntityManagementSystem;import ...@Configuration public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<String, BusinessEntity> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, BusinessEntity> kafkaTemplate() {return new KafkaTemplate<String, BusinessEntity>(producerFactory());} }

KEY_SERIALIZER_CLASS_CONFIG是用于Kafka記錄鍵的Kafka序列化器類,該類實現了Kafka序列化器接口。 注意,我們將其設置為StringSerializer.class作為消息ID。 VALUE_SERIALIZER_CLASS_CONFIG是一個Kafka序列化程序類,我們將其設置為JsonSerializer.class作為消息主體。

要創建消息,首先,我們需要配置一個ProducerFactory,該工廠設置創建Kafka Producer實例的策略。 然后,我們需要一個KafkaTemplate,它包裝一個Producer實例,并提供使用數據傳輸對象“ BusinessEntity ”將消息發送到Kafka主題的便捷方法。

生產者服務

在Kafka Producer服務類中, @ Service注釋表示帶注釋的類是“服務”。 在此類中,我們實現了將消息發送到Kafka代理的方法,在application.properties中預定義的標頭上聲明了topic屬性。

package com.BusinessEntityManagementSystem.kafka;import ...@Service public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, BusinessEntity> kafkaTemplate;@Value("${statistics.kafka.topic}")String kafkaTopic;public void send(BusinessEntity payload) {Message<BusinessEntity> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, kafkaTopic).build();kafkaTemplate.send(message);} }

消費者API

在Consumer中,我們需要添加適當的Deserializer,該Deserializer可以將JSON byte []轉換為Java Object。 要設置它,我們需要使用config和帶有@components注釋的類,當使用基于注釋的配置和類路徑掃描時,它們將自動檢測此類以進行依賴項注入。

消費者配置

同樣,當我們指定KEY_SERIALIZER_CLASS_CONFIG,VALUE_SERIALIZER_CLASS_CONFIG來序列化生產者發布的消息時,我們還需要通知Spring Kafka有關反序列化的常量值,例如KEY_DESERIALIZER_CLASS_CONFIG和VALUE_DESERIALIZER_CLASS_CONFIG。 除了上面引用的常量之外,我們還指定了GROUP_ID_CONFIG和AUTO_OFFSET_RESET_CONFIG作為最早的常量,從而允許使用者讀取代理中最后插入的消息。

要啟用Kafka偵聽器,我們使用@EnableKafka批注。 這注釋了由AbstractListenerContainerFactory在后臺創建的端點。 KafkaListenerContainerFactory負責為特定端點創建偵聽器容器。 它可以檢測容器中任何受Spring管理的bean上的KafkaListener批注。

作為典型的實現, ConcurrentKafkaListenerContainerFactory提供了基礎MessageListenerContainer支持的必要配置選項。

package com.BusinessStatisticsUnitFiles;import ...@Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "statistics-BusinessStatisticsUnitFiles-group");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return props;}@Beanpublic ConsumerFactory<String, BusinessEntity> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(BusinessEntity.class, false));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, BusinessEntity> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, BusinessEntity> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;} }

在消費者工廠,我們可以禁用標題的使用。 現在,這可以通過將新JsonDeserializer <>(BusinessEntity.class,false))中的第二個參數設置為false來實現; 。 這使使用者可以信任來自任何程序包的消息。

消費者“服務”

為了使用消息,有必要像上面一樣配置ConsumerFactory和KafkaListenerContainerFactory。 一旦這些bean在Spring bean工廠中可用,就可以使用@KafkaListener注釋配置基于POJO的使用者。

在用@KafkaListener注釋的類中, @ KafkaHandler還需要將方法標記為Kafka消息偵聽器的目標。 重要的是要了解,當消息到達時,選擇的方法取決于有效負載類型。 該類型與單個非注釋參數匹配,或與@Payload注釋的參數匹配。 絕不能有歧義-系統必須能夠根據有效載荷類型選擇恰好一種方法。

package com.BusinessStatisticsUnitFiles.kafka;import ...@Component public class KafkaConsumer {@AutowiredIBusinessEntityRepository businessEntityRepository;private static final Logger LOG = LoggerFactory.getLogger(BusinessEntity.class);@KafkaListener(topics = "${statistics.kafka.topic.create.entity}", groupId = "statistics-BusinessEntityManagementSystem-group")@KafkaHandlerpublic void receiveCreatedEntity(@Payload BusinessEntity data,@Headers MessageHeaders headers) {businessEntityRepository.save(RetrieveConsumerFromReceivedProducerObject.Binding(new BusinessEntityModel(), data));} }

@Payload批注將方法參數綁定到消息的有效負載。 它還可以用于將有效負載與方法調用相關聯。 有效負載可以通過MessageConverter傳遞,以將其從具有特定MIME類型的序列化形式轉換為與目標方法參數匹配的Object。 用@Payload注釋的類是“ BusinessEntity” DTO。

Spring Boot還支持使用偵聽器中的@Headers批注檢索一個或多個消息頭。 可以為一個主題實現多個偵聽器,每個偵聽器具有不同的組ID。 此外,一個消費者可以收聽來自各種主題的消息。

您可能已經注意到,我們創建的主題建筑只有一個分區。 但是,對于具有多個分區的主題, @ KafkaListener可以顯式訂閱具有初始偏移量的主題的特定分區。

Application.properties

最后但并非最不重要的一點是,在我們的配置中,我們指定一些與生產者和消費者之間的通信行為有關的值。

生產者/消費者

在每個Producer和Consumer API上,我們都使用spring.kafka.bootstrap-servers = localhost:9092定義了我們希望微服務連接的Kafka集群。 同樣,有必要定義主題名稱以產生和接收消息,密鑰以及組ID。

... ## Application.properties Kafka config spring.kafka.bootstrap-servers=localhost:9092 statistics.kafka.topic=test statistics.kafka.key=test statistics.kafka.topic.create.entity=test spring.kafka.producer.group-id=statistics-BusinessStatisticsUnitFiles-group spring.kafka.template.default-topic=test ...

準備Kafka和Zookeeper進行集成測試

下面定義的步驟演示了如何在Windows 10操作系統上運行和測試Kafka。

下載帶有嵌入式Zookeeper的Kafka

  • 下載Kafka二進制文件 。 這篇文章基于Kafka 2.3.1,因此我們假設您正在下載Scala 2.12的2.3.1版本。
  • 解壓縮kafka_2.12-2.3.1.tgz文件。
  • 設置zookeeper.properties

    為了使其正常工作,我們需要更改Zookeeper數據目錄的位置。
    打開kafka \ config \ zookeeper.properties文件,然后將Zookeeper數據/ log目錄位置配置更改為有效的Windows目錄位置。

    dataDir=C:\\kafka\\zookeeper-logs

    設置server.properties

    我們還需要對Kafka配置進行一些更改。 打開kafka \ config \ server.properties并將主題默認設置為1。 我們將運行一個單節點Kafka。 另外,為防止Kafka創建不必要的偏移量,我們將副本指定為1。我們在Windows環境中使用最新的Kafka 2.3.1版本遇到此問題。 這導致Kafka停止,因為內存不足,無法處理在啟動服務器的初始階段自動創建的大量數據。

    ############################# Log Basics #############################log.dirs=C:\\kafka\\kafka-logs####################### Internal Topic Settings #####################offsets.topic.replication.factor=1 offsets.topic.num.partitions = 1 min.insync.replicas=1 default.replication.factor = 1 ...

    要完成Kafka配置,請將Kafka bin \ windows目錄添加到PATH環境變量中。

    創建和執行集成測試

    顧名思義,集成測試專注于集成應用程序的不同層,其中不涉及任何模擬。 集成測試需要啟動一個容器來執行測試用例。 因此,為此需要一些額外的設置,但是使用spring boot時,使用一些注釋和庫可以很容易地完成這些步驟。

    測試班

    第一個注釋@RunWith(SpringRunner.class)用于在Spring Boot測試功能和JUnit之間建立橋梁。 SpringRunner.class在測試中全面支持Spring上下文加載和Bean的依賴項注入。 @SpringBootTest通過SpringApplication創建ApplicationContext測試,這些測試將在我們的測試中使用。 自嵌入式服務器以來,它將引導整個容器,并創建一個Web環境。

    在我們的測試中,我們模仿的是真實的Web環境,將其設置為RANDOM_PORT也會加載WebServerApplicationContext。 嵌入式服務器將啟動并在隨機端口上進行偵聽。

    @RunWith(SpringRunner.class) @SpringBootTest(classes = {BusinessEntityManagementApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class BusinessEntityIntegrationTest {@LocalServerPortprivate int port;@AutowiredTestRestTemplate restTemplate;HttpHeaders headers = new HttpHeaders();

    @LocalServerPort批注為我們提供了在運行時分配的注入的HTTP端口。 這是@Value("${local.server.port}")的便捷替代方法。

    要訪問Spring應用程序中的第三方REST服務,我們使用Spring RestTemplate或TestRestTemplate ,這是一種適合集成測試的便捷替代方法,方法是將其注入我們的測試類中。 通過在項目中使用spring-boot-starter-test依賴項,我們可以在運行時訪問“ TestRestTemplate”類。

    測試方法

    在我們的方法測試中,我們使用“ junit-json-params ”,這是一個Junit 5庫,提供注釋以從JSON字符串或參數化測試中的文件加載數據。 我們還使用@ParameterizedTest注釋對方法進行了注釋,以補充下面的庫。 它用于表示帶注釋的方法是參數化測試方法。 該方法不得為私有或靜態。 他們還必須通過@ArgumentsSource或相應的組合批注指定至少一個ArgumentsProvider 。

    我們的@ArgumentsSource是@ArgumentsSource中的JSON文件@JsonFileSource(resources =“ /business-entity-test-param.json”)。 @JsonFileSource允許您使用類路徑中的JSON文件。 它支持單個對象,對象數組和JSON原語。

    從文件中檢索的JSON對象綁定到方法參數“對象”,該方法將其轉換為POJO對象,在本例中為我們的實體模型。

    @ParameterizedTest @JsonFileSource(resources = "/business-entity-test-param.json") @DisplayName("create business entity with json parameter") void createBusinessEntity(JsonObject object) throws IOException, URISyntaxException {BusinessEntityModel businessEntityModel;businessEntityModel = new BusinessEntityModel();ObjectMapper mapper = new ObjectMapper();businessEntityModel = mapper.readValue(object.toString(), BusinessEntityModel.class);HttpEntity<BusinessEntityModel> request = new HttpEntity<>(businessEntityModel, headers);try {ResponseEntity<String> response = this.restTemplate.postForEntity(createURLWithPort("/api/businessEntityManagementSystem/v1/businessEntity"), request, String.class);assertAll(() -> assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.CREATED.value()),() -> assertThat(response.getHeaders().getLocation().getPath()).contains("/v1"));}catch(HttpClientErrorException ex) {assertAll(() -> Assert.assertEquals(HttpStatus.BAD_REQUEST.value(), ex.getRawStatusCode()),() -> Assert.assertEquals(true, ex.getResponseBodyAsString().contains("Missing request header")));} }

    在安排并執行操作之后,我們斷言對其余API的調用是否返回了所需的結果。

    運行集成測試

    在我們的開發環境中,我們需要確保我們的Kafka和Zookeeper在兩個不同的控制臺中啟動并運行,如圖所示

    Kafka需要Zookeeper,因此我們將首先使用以下命令啟動Zookeeper。

    c:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

    它應該啟動Zookeeper服務器。 最小化命令窗口,并讓Zookeeper在該窗口中運行。 啟動一個新的命令窗口,并使用以下命令啟動Kafka Broker。

    c:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

    接下來,我們將按照我們的想法運行Consumer API,或者將其部署在任何兼容的Web服務器中。

    最后,我們可以將測試類作為JUnit測試執行。 它將按正常方式啟動服務器并部署API。 然后它將執行測試。 您可以在“ JUnit”選項卡中驗證測試。

    結論

    在本文中,我們看到了如何使用發布-訂閱模式以可響應的方式在兩個不同的微服務之間使用可定制格式頻繁,立即,可靠和異步地共享數據,并通過在不同層中的不同層進行集成測試來對其進行驗證。端到端場景。

    參考文獻

    [1] Kafka 2.3文檔 ;


    [2] Gregor Hohpe,Bobby Woolf,企業集成模式設計,構建和部署消息解決方案,2003年;


    [3] 適用于Apache Kafka 2.3.3的Spring 。

    翻譯自: https://www.javacodegeeks.com/2019/11/microservices-in-publish-subscribe-communication-using-apache-kafka-as-a-messaging-systems-and-validated-through-integration-test.html

    總結

    以上是生活随笔為你收集整理的使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。