日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java memcache 队列_基于memcache的java分布式队列实现。

發布時間:2023/12/1 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java memcache 队列_基于memcache的java分布式队列实现。 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

主要有兩個類,一個隊列類和一個job的抽象類。

保證隊列類中的key的唯一性,就可以用spring配置多個實例。水平有限,歡迎吐槽。

上代碼:

1、隊列類

import?net.spy.memcached.MemcachedClient;

import?net.spy.memcached.internal.OperationFuture;

import?org.apache.commons.logging.Log;

import?org.apache.commons.logging.LogFactory;

import?org.springframework.beans.BeansException;

import?org.springframework.beans.factory.DisposableBean;

import?org.springframework.beans.factory.InitializingBean;

import?org.springframework.context.ApplicationContext;

import?org.springframework.context.ApplicationContextAware;

import?com.izx.services.common.Constant;

/**

*

*?@ClassName:?MemCacheQueue

*?@Description:?基于memcache的消息隊列的實現

*?@author?hai.zhu

*?@date?2016-3-31?下午3:29:00

*

*/

public?class?MemCacheQueue?implements?InitializingBean,?DisposableBean,ApplicationContextAware?{

private?static?final?Log?log?=?LogFactory.getLog(MemCacheQueue.class);

/**

*?隊列名

*/

private?String?key;

/**

*?隊列鎖失效分鐘

*/

private?Integer?lockExpireMinite?=?3;

private?MemcachedClient?memcachedClient;

private?ApplicationContext?applicationContext;

ListenerThread?listenerThread?=?new?ListenerThread();

public?void?setKey(String?key)?{

this.key?=?key;

}

public?void?setMemcachedClient(MemcachedClient?memcachedClient)?{

this.memcachedClient?=?memcachedClient;

}

@Override

public?void?setApplicationContext(ApplicationContext?applicationContext)?throws?BeansException?{

this.applicationContext?=?applicationContext;

}

@Override

public?void?destroy()?throws?Exception?{

try?{

this.sign?=?false;

listenerThread.interrupt();

}?catch?(Exception?e)?{

log.error(e);

}

}

@Override

public?void?afterPropertiesSet()?throws?Exception?{

//初始化隊列,用add防止重啟覆蓋

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key,?0,?"0");

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key,?0,?"0");

//設置任務線程

listenerThread.setDaemon(true);

listenerThread.start();

}

/**

*

*?@Title:?push

*?@Description:?唯一對外方法,放入要執行的任務

*?@param?@param?value

*?@param?@throws?Exception????設定文件

*?@return?void????返回類型

*?@throws

*/

public?synchronized?void?push(MemCacheQueueJobAdaptor?value)?throws?Exception?{

//分布加鎖

queuelock();

//放入隊列

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key,?1);

Object?keyorder?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key);

memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorder,?0,?value);

//分布解鎖

queueUnLock();

}

/**

*

*?@Title:?pop

*?@Description:?取出要執行的任務

*?@param?@return

*?@param?@throws?Exception????設定文件

*?@return?MemCacheQueueJobAdaptor????返回類型

*?@throws

*/

private?synchronized?MemCacheQueueJobAdaptor?pop()?throws?Exception?{

Object?keyorderstart?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key);

Object?keyorderend?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key);

if(keyorderstart.equals(keyorderend)){

return?null;

}

MemCacheQueueJobAdaptor?adaptor?=?(MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorderstart);

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key,?1);

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorderstart);

return?adaptor;

}

/**

*

*?@Title:?queuelock

*?@Description:?加鎖

*?@param?@throws?InterruptedException????設定文件

*?@return?void????返回類型

*?@throws

*/

private?void?queuelock()?throws?Exception?{

do?{

OperationFuture?sign?=?memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK?+?key,?lockExpireMinite?*?60,?key);

if(sign.get()){

return;

}?else?{

log.debug("key:?"?+?key?+?"?locked?by?another?business");

}

Thread.sleep(300);

}?while?(true);

}

/**

*

*?@Title:?queueUnLock

*?@Description:?解鎖

*?@param?????設定文件

*?@return?void????返回類型

*?@throws

*/

private?void?queueUnLock()?{

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK?+?key);

}

private?boolean?sign?=?true;

private?long?THREAD_SLEEP?=?10;

class?ListenerThread?extends?Thread?{

@Override

public?void?run(){

log.error("隊列["+key+"]開始執行");

while(sign){

try?{

Thread.sleep(THREAD_SLEEP);

dojob();

}?catch?(Exception?e)?{

log.error(e);

}

}

}

private?void?dojob(){

try{

queuelock();

MemCacheQueueJobAdaptor?adaptor?=?pop();

//逐個執行

if(adaptor?!=?null){

THREAD_SLEEP?=?10;

try?{

adaptor.setApplicationContext(applicationContext);

adaptor.onMessage();

}?catch?(Exception?e)?{

log.error(e);

}

}else{

THREAD_SLEEP?=?5000;

}

}catch(Exception?e){

log.error(e);

}finally{

queueUnLock();

}

}

}

}[/code]

2、job抽象類

import?org.springframework.context.ApplicationContext;

import?java.io.Serializable;

/**

*

*?@ClassName:?MemCacheQueueJobAdaptor

*?@Description:?基于memcache隊列的任務適配器

*?@author?hai.zhu

*?@date?2015-12-11?上午11:48:26

*?@param?

*/

public?abstract?class?MemCacheQueueJobAdaptor?implements?Serializable{

private?static?final?long?serialVersionUID?=?-5071415952097756327L;

private?ApplicationContext?applicationContext;

public?ApplicationContext?getApplicationContext()?{

return?applicationContext;

}

public?void?setApplicationContext(ApplicationContext?applicationContext)?{

this.applicationContext?=?applicationContext;

}

/**

*

*?@Title:?onMessage

*?@Description:?異步執行任務接口

*?@author?hai.zhu

*?@param?@param?value?設定文件

*?@return?void?返回類型

*?@throws

*/

public?abstract?void?onMessage();

}[/code]

3、部分放在constant的常量

/**

*?基于memcache的隊列存放前綴

*/

public?static?String?MEMCACHE_GLOBAL_QUEUE_VARIABLE?=?"MEMCACHE_GLOBAL_QUEUE_VARIABLE_";

/**

*?基于memcache的隊列鎖的前綴

*/

public?static?String?MEMCACHE_GLOBAL_QUEUE_LOCK?=?"MEMCACHE_GLOBAL_QUEUE_LOCK_";

/**

*?基于memcache的隊列鎖的開始元素

*/

public?static?String?MEMCACHE_GLOBAL_QUEUE_STARTKEY?=?"MEMCACHE_GLOBAL_QUEUE_STARTKEY_";

/**

*?基于memcache的隊列鎖的結束元素

*/

public?static?String?MEMCACHE_GLOBAL_QUEUE_ENDKEY?=?"MEMCACHE_GLOBAL_QUEUE_ENDKEY_";[/code]

4、spring配置,保證隊列名的唯一性就可以配置多個隊列

轉載于:https://my.oschina.net/zhuxuan/blog/650935

總結

以上是生活随笔為你收集整理的java memcache 队列_基于memcache的java分布式队列实现。的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。