java web系统拆分_Java系统中如何拆分同步和异步
很多開發(fā)人員說,將應(yīng)用程序切換到異步處理很復(fù)雜。因為他們有一個天然需要同步通信的Web應(yīng)用程序。在這篇文章中,我想介紹一種方法來達到異步通信的目的:使用一些眾所周知的庫和工具來設(shè)計他們的系統(tǒng)。 下面的例子是用Java編寫的,但我相信它更多的是基本原理,同一個應(yīng)用程序可以用任何語言來重新寫。
所需的工具和庫:
Spring Boot
RabbitMQ
1.Web應(yīng)用程序
一個用Spring MVC編寫的Web應(yīng)用程序并運行在Tomcat上。 它所做的只是將一個字符串發(fā)送到一個隊列中 (異步通信的開始) 并等待另一個隊列中的消息作為HTTP響應(yīng)發(fā)送回來。
首先,我們需要定義幾個依賴項,然后等待Spring Boot執(zhí)行所有必要的自動配置。
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-amqp
com.thedeanda
lorem
@SpringBootApplication
public class BlockingApplication{
public static void main(String[] args){
SpringApplication.run(BlockingApplication.class, args);
}
@RestController
public static class MessageController{
private final RabbitTemplate rabbitTemplate;
public MessageController(CachingConnectionFactory connectionFactory){
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
@GetMapping("invoke")
public String sendMessage(){
Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());
return new String(response.getBody());
}
private static Message request(){
Lorem LOREM = LoremIpsum.getInstance();
String name = LOREM.getFirstName() + " " + LOREM.getLastName();
return new Message(name.getBytes(), new MessageProperties());
}
}
@Bean
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
}
2.消費端應(yīng)用程序
第二個應(yīng)用程序僅僅是一個等待消息的RabbitMQ的消費端,將拿到的字符串轉(zhuǎn)換為大寫,然后將此結(jié)果發(fā)送到輸出隊列中。
org.springframework.boot
spring-boot-starter-amqp
@SpringBootApplication
public class ServiceApplication{
public static void main(String[] args){
SpringApplication.run(ServiceApplication.class, args);
}
public static class MessageListener{
public String handleMessage(byte[] message){
Random rand = new Random();
// Obtain a number between [0 - 49] + 50 = [50 - 99]
int n = rand.nextInt(50) + 50;
String content = new String(message);
try {
Thread.sleep(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return content.toUpperCase();
}
}
@Bean
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(40);
container.setQueueNames("uppercase_messages");
container.setMessageListener(new MessageListenerAdapter(new MessageListener()));
return container;
}
}
3.底層如何執(zhí)行的?
程序啟動并首次調(diào)用sendMessage()方法后,我們可以看到Spring AMQP支持自動創(chuàng)建了一個新的回復(fù)隊列并等待來自我們的服務(wù)應(yīng)用程序的響應(yīng)。
2019-05-12 17:23:21.451 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2019-05-12 17:23:21.457 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started
如果我們在消費端應(yīng)用程序中查看消息,我們可以看到Spring自動傳播有關(guān)回復(fù)隊列的信息以及**相關(guān)ID,**用于將其傳遞回Web應(yīng)用程序以便能夠?qū)⒄埱蠛晚憫?yīng)配對在一起。
這就是發(fā)生魔術(shù)的地方。 當(dāng)然,如果您想使其更復(fù)雜,您可以在協(xié)作中包含更多服務(wù),然后將Web應(yīng)用程序的最終響應(yīng)放入與自動生成的隊列不同的隊列中, 該隊列只具有正確的關(guān)聯(lián)ID。 另外,不要忘記設(shè)置合理的超時。
這個解決方案還有一個很大的缺點 - 應(yīng)用程序吞吐量。 我故意這樣做,以便我可以跟進這篇文章,進一步深入調(diào)查AsyncProfiler! 但是目前,我們使用Tomcat作為主HTTP服務(wù)器,默認(rèn)為200個線程,這意味著我們的應(yīng)用程序無法同時處理200多條消息,因為我們的服務(wù)器線程正在等待RabbitMQ?回復(fù)隊列的響應(yīng),直到有消息進入或發(fā)生超時。
感謝您閱讀本文,敬請關(guān)注后續(xù)內(nèi)容! 如果您想自己嘗試一下,請查看我的GitHub存儲庫。
很多開發(fā)人員說,將應(yīng)用程序切換到異步處理很復(fù)雜。因為他們有一個天然需要同步通信的Web應(yīng)用程序。在這篇文章中,我想介紹一種方法來達到異步通信的目的:使用一些眾所周知的庫和工具來設(shè)計他們的系統(tǒng)。 下面的例子是用Java編寫的,但我相信它更多的是基本原理,同一個應(yīng)用程序可以用任何語言來重新寫。
所需的工具和庫:
- Spring Boot- RabbitMQ
## 1.Web應(yīng)用程序
一個用Spring MVC編寫的Web應(yīng)用程序并運行在Tomcat上。 它所做的只是將一個字符串發(fā)送到一個隊列中 (異步通信的開始) 并等待另一個隊列中的消息作為HTTP響應(yīng)發(fā)送回來。
首先,我們需要定義幾個依賴項,然后等待Spring Boot執(zhí)行所有必要的自動配置。
```java? ? ? ? ? ? org.springframework.boot? ? ? ? spring-boot-starter-web? ? ? ? ? ? ? ? org.springframework.boot? ? ? ? spring-boot-starter-amqp? ? ? ? ? ? ? ? com.thedeanda? ? ? ? lorem? ? ```
```java@SpringBootApplicationpublic class BlockingApplication {? ? public static void main(String[] args) {? ? ? ? SpringApplication.run(BlockingApplication.class, args);? ? }? ? @RestController? ? public static class MessageController {? ? ? ? private final RabbitTemplate rabbitTemplate;? ? ? ? public MessageController(CachingConnectionFactory connectionFactory) {? ? ? ? ? ? this.rabbitTemplate = new RabbitTemplate(connectionFactory);? ? ? ? }? ? ? ? @GetMapping("invoke")? ? ? ? public String sendMessage() {? ? ? ? ? ? Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());? ? ? ? ? ? return new String(response.getBody());? ? ? ? }? ? ? ? private static Message request() {? ? ? ? ? ? Lorem LOREM = LoremIpsum.getInstance();? ? ? ? ? ? String name = LOREM.getFirstName() + " " + LOREM.getLastName();? ? ? ? ? ? return new Message(name.getBytes(), new MessageProperties());? ? ? ? }? ? }? ? @Bean? ? public CachingConnectionFactory connectionFactory() {? ? ? ? CachingConnectionFactory factory = new CachingConnectionFactory();? ? ? ? factory.setAddresses("localhost:5672");? ? ? ? factory.setUsername("admin");? ? ? ? factory.setPassword("admin");? ? ? ? return factory;? ? }}```
## 2.消費端應(yīng)用程序
第二個應(yīng)用程序僅僅是一個等待消息的RabbitMQ的消費端,將拿到的字符串轉(zhuǎn)換為大寫,然后將此結(jié)果發(fā)送到輸出隊列中。
```java? ? ? ? ? ? org.springframework.boot? ? ? ? spring-boot-starter-amqp? ? ```
```java@SpringBootApplicationpublic class ServiceApplication {? ? public static void main(String[] args) {? ? ? ? SpringApplication.run(ServiceApplication.class, args);? ? }? ? public static class MessageListener {? ? ? ? public String handleMessage(byte[] message) {? ? ? ? ? ? Random rand = new Random();? ? ? ? ? ? // Obtain a number between [0 - 49] + 50 = [50 - 99]? ? ? ? ? ? int n = rand.nextInt(50) + 50;? ? ? ? ? ? String content = new String(message);? ? ? ? ? ? try {? ? ? ? ? ? ? ? Thread.sleep(n);? ? ? ? ? ? } catch (InterruptedException e) {? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? }? ? ? ? ? ? return content.toUpperCase();? ? ? ? }? ? }? ? @Bean? ? public CachingConnectionFactory connectionFactory() {? ? ? ? CachingConnectionFactory factory = new CachingConnectionFactory();? ? ? ? factory.setAddresses("localhost:5672");? ? ? ? factory.setUsername("admin");? ? ? ? factory.setPassword("admin");? ? ? ? return factory;? ? }? ? @Bean? ? public SimpleMessageListenerContainer serviceListenerContainer() {? ? ? ? SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();? ? ? ? container.setConnectionFactory(connectionFactory());? ? ? ? container.setConcurrentConsumers(20);? ? ? ? container.setMaxConcurrentConsumers(40);? ? ? ? container.setQueueNames("uppercase_messages");? ? ? ? container.setMessageListener(new MessageListenerAdapter(new MessageListener()));? ? ? ? return container;? ? }}```
### 3.底層如何執(zhí)行的?
程序啟動并首次調(diào)用sendMessage()方法后,我們可以看到Spring AMQP支持自動創(chuàng)建了一個新的**回復(fù)隊列**并等待來自我們的服務(wù)應(yīng)用程序的響應(yīng)。
```2019-05-12 17:23:21.451? INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]2019-05-12 17:23:21.457? INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started```
如果我們在消費端應(yīng)用程序中查看消息,我們可以看到Spring自動傳播有關(guān)**回復(fù)隊列**的信息以及**相關(guān)ID,**用于將其傳遞回Web應(yīng)用程序以便能夠?qū)⒄埱蠛晚憫?yīng)配對在一起。
這就是發(fā)生魔術(shù)的地方。 當(dāng)然,如果您想使其更復(fù)雜,您可以在協(xié)作中包含更多服務(wù),然后將Web應(yīng)用程序的最終響應(yīng)放入與自動生成的隊列不同的隊列中, 該隊列只具有正確的*關(guān)聯(lián)ID*。 另外,不要忘記設(shè)置合理的超時。
這個解決方案還有一個很大的缺點 - 應(yīng)用程序吞吐量。 我故意這樣做,以便我可以跟進這篇文章,進一步深入調(diào)查`AsyncProfiler`! 但是目前,我們使用Tomcat作為主HTTP服務(wù)器,默認(rèn)為200個線程,這意味著我們的應(yīng)用程序無法同時處理200多條消息,因為我們的服務(wù)器線程正在等待RabbitMQ **回復(fù)隊列**的響應(yīng),直到有消息進入或發(fā)生超時。
感謝您閱讀本文,敬請關(guān)注后續(xù)內(nèi)容! 如果您想自己嘗試一下,請查看我的[GitHub存儲庫](https://github.com/petrbouda/rabbitmq-async-microservices)。
> 原文鏈接:https://dzone.com/articles/how-to-split-up-a-synchronous-and-asynchronous-of
> [作者:Petr Bouda](https://github.com/petrbouda)
> [譯者:KeepGoingPawn](https://github.com/KeepGoingPawn)
總結(jié)
以上是生活随笔為你收集整理的java web系统拆分_Java系统中如何拆分同步和异步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 线程不运行时间_java如何设
- 下一篇: java 单机版_JAVA单机版管理系统