java memcache 队列_基于memcache的java分布式队列实现。
主要有兩個類,一個隊列類和一個job的抽象類。
保證隊列類中的key的唯一性,就可以用spring配置多個實例。水平有限,歡迎吐槽。
上代碼:
1、隊列類
import?net.spy.memcached.MemcachedClient;
import?net.spy.memcached.internal.OperationFuture;
import?org.apache.commons.logging.Log;
import?org.apache.commons.logging.LogFactory;
import?org.springframework.beans.BeansException;
import?org.springframework.beans.factory.DisposableBean;
import?org.springframework.beans.factory.InitializingBean;
import?org.springframework.context.ApplicationContext;
import?org.springframework.context.ApplicationContextAware;
import?com.izx.services.common.Constant;
/**
*
*?@ClassName:?MemCacheQueue
*?@Description:?基于memcache的消息隊列的實現
*?@author?hai.zhu
*?@date?2016-3-31?下午3:29:00
*
*/
public?class?MemCacheQueue?implements?InitializingBean,?DisposableBean,ApplicationContextAware?{
private?static?final?Log?log?=?LogFactory.getLog(MemCacheQueue.class);
/**
*?隊列名
*/
private?String?key;
/**
*?隊列鎖失效分鐘
*/
private?Integer?lockExpireMinite?=?3;
private?MemcachedClient?memcachedClient;
private?ApplicationContext?applicationContext;
ListenerThread?listenerThread?=?new?ListenerThread();
public?void?setKey(String?key)?{
this.key?=?key;
}
public?void?setMemcachedClient(MemcachedClient?memcachedClient)?{
this.memcachedClient?=?memcachedClient;
}
@Override
public?void?setApplicationContext(ApplicationContext?applicationContext)?throws?BeansException?{
this.applicationContext?=?applicationContext;
}
@Override
public?void?destroy()?throws?Exception?{
try?{
this.sign?=?false;
listenerThread.interrupt();
}?catch?(Exception?e)?{
log.error(e);
}
}
@Override
public?void?afterPropertiesSet()?throws?Exception?{
//初始化隊列,用add防止重啟覆蓋
memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key,?0,?"0");
memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key,?0,?"0");
//設置任務線程
listenerThread.setDaemon(true);
listenerThread.start();
}
/**
*
*?@Title:?push
*?@Description:?唯一對外方法,放入要執行的任務
*?@param?@param?value
*?@param?@throws?Exception????設定文件
*?@return?void????返回類型
*?@throws
*/
public?synchronized?void?push(MemCacheQueueJobAdaptor?value)?throws?Exception?{
//分布加鎖
queuelock();
//放入隊列
memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key,?1);
Object?keyorder?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key);
memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorder,?0,?value);
//分布解鎖
queueUnLock();
}
/**
*
*?@Title:?pop
*?@Description:?取出要執行的任務
*?@param?@return
*?@param?@throws?Exception????設定文件
*?@return?MemCacheQueueJobAdaptor????返回類型
*?@throws
*/
private?synchronized?MemCacheQueueJobAdaptor?pop()?throws?Exception?{
Object?keyorderstart?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key);
Object?keyorderend?=?memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY?+?key);
if(keyorderstart.equals(keyorderend)){
return?null;
}
MemCacheQueueJobAdaptor?adaptor?=?(MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorderstart);
memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY?+?key,?1);
memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE?+?key?+?"_"?+?keyorderstart);
return?adaptor;
}
/**
*
*?@Title:?queuelock
*?@Description:?加鎖
*?@param?@throws?InterruptedException????設定文件
*?@return?void????返回類型
*?@throws
*/
private?void?queuelock()?throws?Exception?{
do?{
OperationFuture?sign?=?memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK?+?key,?lockExpireMinite?*?60,?key);
if(sign.get()){
return;
}?else?{
log.debug("key:?"?+?key?+?"?locked?by?another?business");
}
Thread.sleep(300);
}?while?(true);
}
/**
*
*?@Title:?queueUnLock
*?@Description:?解鎖
*?@param?????設定文件
*?@return?void????返回類型
*?@throws
*/
private?void?queueUnLock()?{
memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK?+?key);
}
private?boolean?sign?=?true;
private?long?THREAD_SLEEP?=?10;
class?ListenerThread?extends?Thread?{
@Override
public?void?run(){
log.error("隊列["+key+"]開始執行");
while(sign){
try?{
Thread.sleep(THREAD_SLEEP);
dojob();
}?catch?(Exception?e)?{
log.error(e);
}
}
}
private?void?dojob(){
try{
queuelock();
MemCacheQueueJobAdaptor?adaptor?=?pop();
//逐個執行
if(adaptor?!=?null){
THREAD_SLEEP?=?10;
try?{
adaptor.setApplicationContext(applicationContext);
adaptor.onMessage();
}?catch?(Exception?e)?{
log.error(e);
}
}else{
THREAD_SLEEP?=?5000;
}
}catch(Exception?e){
log.error(e);
}finally{
queueUnLock();
}
}
}
}[/code]
2、job抽象類
import?org.springframework.context.ApplicationContext;
import?java.io.Serializable;
/**
*
*?@ClassName:?MemCacheQueueJobAdaptor
*?@Description:?基于memcache隊列的任務適配器
*?@author?hai.zhu
*?@date?2015-12-11?上午11:48:26
*?@param?
*/
public?abstract?class?MemCacheQueueJobAdaptor?implements?Serializable{
private?static?final?long?serialVersionUID?=?-5071415952097756327L;
private?ApplicationContext?applicationContext;
public?ApplicationContext?getApplicationContext()?{
return?applicationContext;
}
public?void?setApplicationContext(ApplicationContext?applicationContext)?{
this.applicationContext?=?applicationContext;
}
/**
*
*?@Title:?onMessage
*?@Description:?異步執行任務接口
*?@author?hai.zhu
*?@param?@param?value?設定文件
*?@return?void?返回類型
*?@throws
*/
public?abstract?void?onMessage();
}[/code]
3、部分放在constant的常量
/**
*?基于memcache的隊列存放前綴
*/
public?static?String?MEMCACHE_GLOBAL_QUEUE_VARIABLE?=?"MEMCACHE_GLOBAL_QUEUE_VARIABLE_";
/**
*?基于memcache的隊列鎖的前綴
*/
public?static?String?MEMCACHE_GLOBAL_QUEUE_LOCK?=?"MEMCACHE_GLOBAL_QUEUE_LOCK_";
/**
*?基于memcache的隊列鎖的開始元素
*/
public?static?String?MEMCACHE_GLOBAL_QUEUE_STARTKEY?=?"MEMCACHE_GLOBAL_QUEUE_STARTKEY_";
/**
*?基于memcache的隊列鎖的結束元素
*/
public?static?String?MEMCACHE_GLOBAL_QUEUE_ENDKEY?=?"MEMCACHE_GLOBAL_QUEUE_ENDKEY_";[/code]
4、spring配置,保證隊列名的唯一性就可以配置多個隊列
轉載于:https://my.oschina.net/zhuxuan/blog/650935
總結
以上是生活随笔為你收集整理的java memcache 队列_基于memcache的java分布式队列实现。的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 楚乔传燕洵母亲白笙和皇上是什么关系 白笙
- 下一篇: java mapfile_基于文件的数据