使用DelayQueue、ConcurrentHashMap、FutureTask實(shí)現(xiàn)的緩存工具類(lèi)。
DelayQueue 簡(jiǎn)介
DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。DelayQueue內(nèi)部隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿(mǎn)時(shí)才能從隊(duì)列中提取元素。
DelayQueue非常有用,可以將DelayQueue運(yùn)用在以下應(yīng)用場(chǎng)景。
緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線(xiàn)程循環(huán)查詢(xún)
DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從
DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
ConcurrentHashMap和FutureTask,詳見(jiàn)以下:
http://www.jianshu.com/p/d10256f0ebeaFutureTask 源碼分析
緩存工具類(lèi)實(shí)現(xiàn)
支持緩存多長(zhǎng)時(shí)間,單位毫秒。支持多線(xiàn)程并發(fā)。
比如:有一個(gè)比較耗時(shí)的操作,此時(shí)緩沖中沒(méi)有此緩存值,一個(gè)線(xiàn)程開(kāi)始計(jì)算這個(gè)耗時(shí)操作,而再次進(jìn)來(lái)線(xiàn)程就不需要再次進(jìn)行計(jì)算,只需要等上一個(gè)線(xiàn)程計(jì)算完成后(使用FutureTask)返回該值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class CacheBean<V> {private final static ConcurrentMap<String, Future<Object>> cache =
new ConcurrentHashMap<>();
private final static DelayQueue<DelayedItem<String>> delayQueue =
new DelayQueue<>();
private final int ms;
static {Thread t =
new Thread() {
@Overridepublic void run() {dameonCheckOverdueKey();}};t.setDaemon(
true);t.start();}
private final Computable<V> c;
/*** @param c Computable*/public CacheBean(Computable<V> c) {
this(c,
60 *
1000);}
/*** @param c Computable* @param ms 緩存多少毫秒*/public CacheBean(Computable<V> c,
int ms) {
this.c = c;
this.ms = ms;}
public V
compute(
final String key)
throws InterruptedException {
while (
true) {Future<V> f = (Future<V>) cache.get(key);
if (f ==
null) {Callable<V> eval =
new Callable<V>() {
public V
call() {
return (V) c.compute(key);}};FutureTask<V> ft =
new FutureTask<>(eval);f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
if (f ==
null) {delayQueue.put(
new DelayedItem<>(key, ms));f = ft;ft.run();}}
try {
return f.get();}
catch (CancellationException e) {cache.remove(key, f);}
catch (ExecutionException e) {e.printStackTrace();}}}
/*** 檢查過(guò)期的key,從cache中刪除*/private static void dameonCheckOverdueKey() {DelayedItem<String> delayedItem;
while (
true) {
try {delayedItem = delayQueue.take();
if (delayedItem !=
null) {cache.remove(delayedItem.getT());System.out.println(System.nanoTime() +
" remove " + delayedItem.getT() +
" from cache");}}
catch (InterruptedException e) {e.printStackTrace();}}}}class DelayedItem<T> implements Delayed {
private T t;
private long liveTime;
private long removeTime;
public DelayedItem(T t,
long liveTime) {
this.setT(t);
this.liveTime = liveTime;
this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();}
@Overridepublic int compareTo(Delayed o) {
if (o ==
null)
return 1;
if (o ==
this)
return 0;
if (o
instanceof DelayedItem) {DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
if (liveTime > tmpDelayedItem.liveTime) {
return 1;}
else if (liveTime == tmpDelayedItem.liveTime) {
return 0;}
else {
return -
1;}}
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return diff >
0 ?
1 : diff ==
0 ?
0 : -
1;}
@Overridepublic long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.currentTimeMillis(), unit);}
public T
getT() {
return t;}
public void setT(T t) {
this.t = t;}
@Overridepublic int hashCode() {
return t.hashCode();}
@Overridepublic boolean equals(Object object) {
if (object
instanceof DelayedItem) {
return object.hashCode() == hashCode() ?
true :
false;}
return false;}}
Computable 接口
public interface Computable<V> {V compute(String k);}
測(cè)試類(lèi)
public class FutureTaskDemo {public static void main(String[] args)
throws InterruptedException {Thread t =
new Thread(() -> {CacheBean<String> cb =
new CacheBean<>(k -> {
try {System.out.println(
"模擬計(jì)算數(shù)據(jù),計(jì)算時(shí)長(zhǎng)2秒。key=" + k);TimeUnit.SECONDS.sleep(
2);}
catch (InterruptedException e) {e.printStackTrace();}
return "你好:" + k;},
5000);
try {
while (
true) {System.out.println(
"thead2:" + cb.compute(
"b"));TimeUnit.SECONDS.sleep(
1);}}
catch (InterruptedException e) {e.printStackTrace();}});t.start();
while (
true) {CacheBean<String> cb =
new CacheBean<>(k -> {
try {System.out.println(
"模擬計(jì)算數(shù)據(jù),計(jì)算時(shí)長(zhǎng)2秒。key=" + k);TimeUnit.SECONDS.sleep(
2);}
catch (InterruptedException e) {e.printStackTrace();}
return "你好:" + k;},
5000);System.out.println(
"thead1:" + cb.compute(
"b"));TimeUnit.SECONDS.sleep(
1);}}
}
執(zhí)行結(jié)果:
兩個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn)同一個(gè)key的緩存。從執(zhí)行結(jié)果發(fā)現(xiàn),每次緩存失效后,同一個(gè)key只執(zhí)行一次計(jì)算,而不是多個(gè)線(xiàn)程并發(fā)執(zhí)行同一個(gè)計(jì)算然后緩存。
本人簡(jiǎn)書(shū)blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點(diǎn)擊這里快速進(jìn)入簡(jiǎn)書(shū)
GIT地址:http://git.oschina.net/brucekankan/
點(diǎn)擊這里快速進(jìn)入GIT
總結(jié)
以上是生活随笔為你收集整理的使用DelayQueue 和 FutureTask 实现java中的缓存的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。