日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Java如何解决mysql读写延迟_java中延迟任务的处理方式

發布時間:2025/3/19 数据库 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java如何解决mysql读写延迟_java中延迟任务的处理方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、利用延遲隊列

延時隊列,第一他是個隊列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務否則獲取不到……

應用場景比較多,比如延時1分鐘發短信,延時1分鐘再次執行等,下面先看看延時隊列demo之后再看延時隊列在項目中的使用:

簡單的延時隊列要有三部分:第一實現了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列,那下面就來看看延時隊列demo。

一、消息體

packagecom.delqueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/*** 消息體定義 實現Delayed接口就是實現兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期……*/

public class Message implementsDelayed {private intid;private String body; //消息內容

private long excuteTime;//延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。

public intgetId() {returnid;

}publicString getBody() {returnbody;

}public longgetExcuteTime() {returnexcuteTime;

}public Message(int id, String body, longdelayTime) {this.id =id;this.body =body;this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) +System.nanoTime();

}//自定義實現比較方法返回 1 0 -1三個參數

@Overridepublic intcompareTo(Delayed delayed) {

Message msg=(Message) delayed;return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);

}//延遲任務是否到時就是按照這個方法判斷如果返回的是負數則說明到期否則還沒到期

@Overridepublic longgetDelay(TimeUnit unit) {return unit.convert(this.excuteTime -System.nanoTime(), TimeUnit.NANOSECONDS);

}

}

二、消息消費者

packagecom.delqueue;importjava.util.concurrent.DelayQueue;public class Consumer implementsRunnable {//延時隊列 ,消費者從其中獲取消息進行消費

private DelayQueuequeue;public Consumer(DelayQueuequeue) {this.queue =queue;

}

@Overridepublic voidrun() {while (true) {try{

Message take=queue.take();

System.out.println("消費消息id:" + take.getId() + " 消息體:" +take.getBody());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

三、延時隊列

packagecom.delqueue;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;public classDelayQueueTest {public static voidmain(String[] args) {//創建延時隊列

DelayQueue queue = new DelayQueue();//添加延時消息,m1 延時3s

Message m1 = new Message(1, "world", 3000);//添加延時消息,m2 延時10s

Message m2 = new Message(2, "hello", 10000);//將延時消息放到延時隊列中

queue.offer(m2);

queue.offer(m1);//啟動消費線程 消費添加到延時隊列中的消息,前提是任務到了延期時間

ExecutorService exec = Executors.newFixedThreadPool(1);

exec.execute(newConsumer(queue));

exec.shutdown();

}

}

將消息體放入延遲隊列中,在啟動消費者線程去消費延遲隊列中的消息,如果延遲隊列中的消息到了延遲時間則可以從中取出消息否則無法取出消息也就無法消費。

這就是延遲隊列demo,下面我們來說說在真實環境下的使用。

使用場景描述:

在打車軟件中對訂單進行派單的流程,當有訂單的時候給該訂單篩選司機,然后給當訂單綁定司機,但是有時運氣沒那么好,訂單進來后第一次沒有篩選到合適的司機,但我們也不能就此結束派單,而是將該訂單的信息放到延時隊列中過個2秒鐘在進行一次,其實這個2秒鐘就是一個延遲,所以這里我們就可以使用延時隊列來實現……

下面看看簡單的流程圖:

下面來看看具體代碼實現:

在項目中有如下幾個類:第一 、任務類 ? 第二、按照任務類組裝的消息體類 ?第三、延遲隊列管理類

任務類即執行篩選司機、綁單、push消息的任務類

packagecom.test.delayqueue;/*** 具體執行相關業務的業務類

*@authorwhd

* @date 2017年9月25日 上午12:49:32*/

public class DelayOrderWorker implementsRunnable {

@Overridepublic voidrun() {//TODO Auto-generated method stub//相關業務邏輯處理

System.out.println(Thread.currentThread().getName()+" do something ……");

}

}

消息體類,在延時隊列中這個實現了Delayed接口的消息類是比不可少的,實現接口時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的

這里定義的是一個泛型類,所以可以將我們上面的任務類作為其中的task,這樣就將任務類分裝成了一個消息體

packagecom.test.delayqueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/*** 延時隊列中的消息體將任務封裝為消息體

*

*@authorwhd

* @date 2017年9月25日 上午12:48:30

*@param*/

public class DelayOrderTask implementsDelayed {private final longtime;private final T task; //任務類,也就是之前定義的任務類

/***@paramtimeout

* 超時時間(秒)

*@paramtask

* 任務*/

public DelayOrderTask(longtimeout, T task) {this.time = System.nanoTime() +timeout;this.task =task;

}

@Overridepublic intcompareTo(Delayed o) {//TODO Auto-generated method stub

DelayOrderTask other =(DelayOrderTask) o;long diff = time -other.time;if (diff > 0) {return 1;

}else if (diff < 0) {return -1;

}else{return 0;

}

}

@Overridepublic longgetDelay(TimeUnit unit) {//TODO Auto-generated method stub

return unit.convert(this.time -System.nanoTime(), TimeUnit.NANOSECONDS);

}

@Overridepublic inthashCode() {returntask.hashCode();

}publicT getTask() {returntask;

}

}

延時隊列管理類,這個類主要就是將任務類封裝成消息并并添加到延時隊列中,以及輪詢延時隊列從中取出到時的消息體,在獲取任務類放到線程池中執行任務

packagecom.test.delayqueue;importjava.util.Map;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;/*** 延時隊列管理類,用來添加任務、執行任務

*

*@authorwhd

* @date 2017年9月25日 上午12:44:59*/

public classDelayOrderQueueManager {private final static int DEFAULT_THREAD_NUM = 5;private static int thread_num =DEFAULT_THREAD_NUM;//固定大小線程池

privateExecutorService executor;//守護線程

privateThread daemonThread;//延時隊列

private DelayQueue>delayQueue;private static final AtomicLong atomic = new AtomicLong(0);private final long n = 1;private static DelayOrderQueueManager instance = newDelayOrderQueueManager();privateDelayOrderQueueManager() {

executor=Executors.newFixedThreadPool(thread_num);

delayQueue= new DelayQueue<>();

init();

}public staticDelayOrderQueueManager getInstance() {returninstance;

}/*** 初始化*/

public voidinit() {

daemonThread= new Thread(() ->{

execute();

});

daemonThread.setName("DelayQueueMonitor");

daemonThread.start();

}private voidexecute() {while (true) {

Map map =Thread.getAllStackTraces();

System.out.println("當前存活線程數量:" +map.size());int taskNum =delayQueue.size();

System.out.println("當前延時任務數量:" +taskNum);try{//從延時隊列中獲取任務

DelayOrderTask> delayOrderTask =delayQueue.take();if (delayOrderTask != null) {

Runnable task=delayOrderTask.getTask();if (null ==task) {continue;

}//提交到線程池執行task

executor.execute(task);

}

}catch(Exception e) {

e.printStackTrace();

}

}

}/*** 添加任務

*

*@paramtask

*@paramtime

* 延時時間

*@paramunit

* 時間單位*/

public void put(Runnable task, longtime, TimeUnit unit) {//獲取延時時間

long timeout =TimeUnit.NANOSECONDS.convert(time, unit);//將任務封裝成實現Delayed接口的消息體

DelayOrderTask> delayOrder = new DelayOrderTask<>(timeout, task);//將消息體放到延時隊列中

delayQueue.put(delayOrder);

}/*** 刪除任務

*

*@paramtask

*@return

*/

public booleanremoveTask(DelayOrderTask task) {returndelayQueue.remove(task);

}

}

測試類

packagecom.delqueue;importjava.util.concurrent.TimeUnit;importcom.test.delayqueue.DelayOrderQueueManager;importcom.test.delayqueue.DelayOrderWorker;public classTest {public static voidmain(String[] args) {

DelayOrderWorker work1= new DelayOrderWorker();//任務1

DelayOrderWorker work2 = new DelayOrderWorker();//任務2

DelayOrderWorker work3 = new DelayOrderWorker();//任務3//延遲隊列管理類,將任務轉化消息體并將消息體放入延遲對列中等待執行

DelayOrderQueueManager manager =DelayOrderQueueManager.getInstance();

manager.put(work1,3000, TimeUnit.MILLISECONDS);

manager.put(work2,6000, TimeUnit.MILLISECONDS);

manager.put(work3,9000, TimeUnit.MILLISECONDS);

}

}

OK 這就是項目中的具體使用情況,當然具體內容被忽略,整體框架就是這樣,還有這里使用java的延時隊列但是這種方式是有問題的如果如果down機則會出現任務丟失,所以也可以考慮使用mq、redis來實現

2、mq實現延遲消息

在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上。

插件源碼地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安裝:

進入插件安裝目錄

{rabbitmq-server}/plugins/(可以查看一下當前已存在的插件)

下載插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下載的文件名稱不規則就手動重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

關閉插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性

x-delayed-message是插件提供的類型,并不是rabbitmq本身的,發送消息的時候通過在header添加”x-delay”參數來控制消息的延時時間

直接在maven工程的pom.xml文件中加入

org.springframework.boot

spring-boot-starter-amqp

Spring Boot的版本我使用的是 2.0.1.RELEASE .

接下來在 application.properties 文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

定義ConnectionFactory和RabbitTemplate

也很簡單,代碼如下:

packagecom.mq.rabbitmq;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;

@Configuration

@ConfigurationProperties(prefix= "spring.rabbitmq")public classRabbitMqConfig {privateString host;private intport;privateString userName;privateString password;

@BeanpublicConnectionFactory connectionFactory() {

CachingConnectionFactory cachingConnectionFactory= newCachingConnectionFactory(host,port);

cachingConnectionFactory.setUsername(userName);

cachingConnectionFactory.setPassword(password);

cachingConnectionFactory.setVirtualHost("/");

cachingConnectionFactory.setPublisherConfirms(true);returncachingConnectionFactory;

}

@BeanpublicRabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate= newRabbitTemplate(connectionFactory());returnrabbitTemplate;

}publicString getHost() {returnhost;

}public voidsetHost(String host) {this.host =host;

}public intgetPort() {returnport;

}public void setPort(intport) {this.port =port;

}publicString getUserName() {returnuserName;

}public voidsetUserName(String userName) {this.userName =userName;

}publicString getPassword() {returnpassword;

}public voidsetPassword(String password) {this.password =password;

}

}

Exchange和Queue配置

packagecom.mq.rabbitmq;import org.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;

@Configurationpublic classQueueConfig {

@BeanpublicCustomExchange delayExchange() {

Map args = new HashMap<>();

args.put("x-delayed-type", "direct");return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);

}

@BeanpublicQueue queue() {

Queue queue= new Queue("test_queue_1", true);returnqueue;

}

@BeanpublicBinding binding() {return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();

}

}

這里要特別注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的類型必須是 x-delayed-message 。

實現消息發送

packagecom.mq.rabbitmq;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.text.SimpleDateFormat;importjava.util.Date;

@Servicepublic classMessageServiceImpl {

@AutowiredprivateRabbitTemplate rabbitTemplate;public voidsendMsg(String queueName,String msg) {

SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("消息發送時間:"+sdf.format(newDate()));

rabbitTemplate.convertAndSend("test_exchange", queueName, msg, newMessagePostProcessor() {

@Overridepublic Message postProcessMessage(Message message) throwsAmqpException {

message.getMessageProperties().setHeader("x-delay",3000);returnmessage;

}

});

}

}

注意在發送的時候,必須加上一個header

x-delay

在這里我設置的延遲時間是3秒。

消息消費者

packagecom.mq.rabbitmq;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.text.SimpleDateFormat;importjava.util.Date;

@Componentpublic classMessageReceiver {

@RabbitListener(queues= "test_queue_1")public voidreceive(String msg) {

SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("消息接收時間:"+sdf.format(newDate()));

System.out.println("接收到的消息:"+msg);

}

}

運行Spring Boot程序和發送消息

直接在main方法里運行Spring Boot程序,Spring Boot會自動解析 MessageReceiver 類的。

接下來只需要用Junit運行一下發送消息的接口即可。

packagecom.mq.rabbitmq;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@SpringBootTestpublic classRabbitmqApplicationTests {

@AutowiredprivateMessageServiceImpl messageService;

@Testpublic voidsend() {

messageService.sendMsg("test_queue_1","hello i am delay msg");

}

}

運行完后,可以看到如下信息:

消息發送時間:2018-05-03 12:44:533秒鐘后,Spring Boot控制臺會輸出:

消息接收時間:2018-05-03 12:44:56接收到的消息:hello i am delay msg

總結

以上是生活随笔為你收集整理的Java如何解决mysql读写延迟_java中延迟任务的处理方式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。