使用DelayQueue、ConcurrentHashMap、FutureTask實(shí)現(xiàn)的緩存工具類。
DelayQueue 簡介
DelayQueue是一個(gè)支持延時(shí)獲取元素的無界阻塞隊(duì)列。DelayQueue內(nèi)部隊(duì)列使用PriorityQueue來實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
DelayQueue非常有用,可以將DelayQueue運(yùn)用在以下應(yīng)用場景。
緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢
DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從
DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
ConcurrentHashMap和FutureTask,詳見以下:
http://www.jianshu.com/p/d10256f0ebeaFutureTask 源碼分析
緩存工具類實(shí)現(xiàn)
支持緩存多長時(shí)間,單位毫秒。支持多線程并發(fā)。
比如:有一個(gè)比較耗時(shí)的操作,此時(shí)緩沖中沒有此緩存值,一個(gè)線程開始計(jì)算這個(gè)耗時(shí)操作,而再次進(jìn)來線程就不需要再次進(jìn)行計(jì)算,只需要等上一個(gè)線程計(jì)算完成后(使用FutureTask)返回該值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class CacheBean<V> {private final static ConcurrentMap<String, Future<Object>> cache =
new ConcurrentHashMap<>();
private final static DelayQueue<DelayedItem<String>> delayQueue =
new DelayQueue<>();
private final int ms;
static {Thread t =
new Thread() {
@Overridepublic void run() {dameonCheckOverdueKey();}};t.setDaemon(
true);t.start();}
private final Computable<V> c;
/*** @param c Computable*/public CacheBean(Computable<V> c) {
this(c,
60 *
1000);}
/*** @param c Computable* @param ms 緩存多少毫秒*/public CacheBean(Computable<V> c,
int ms) {
this.c = c;
this.ms = ms;}
public V
compute(
final String key)
throws InterruptedException {
while (
true) {Future<V> f = (Future<V>) cache.get(key);
if (f ==
null) {Callable<V> eval =
new Callable<V>() {
public V
call() {
return (V) c.compute(key);}};FutureTask<V> ft =
new FutureTask<>(eval);f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
if (f ==
null) {delayQueue.put(
new DelayedItem<>(key, ms));f = ft;ft.run();}}
try {
return f.get();}
catch (CancellationException e) {cache.remove(key, f);}
catch (ExecutionException e) {e.printStackTrace();}}}
/*** 檢查過期的key,從cache中刪除*/private static void dameonCheckOverdueKey() {DelayedItem<String> delayedItem;
while (
true) {
try {delayedItem = delayQueue.take();
if (delayedItem !=
null) {cache.remove(delayedItem.getT());System.out.println(System.nanoTime() +
" remove " + delayedItem.getT() +
" from cache");}}
catch (InterruptedException e) {e.printStackTrace();}}}}class DelayedItem<T> implements Delayed {
private T t;
private long liveTime;
private long removeTime;
public DelayedItem(T t,
long liveTime) {
this.setT(t);
this.liveTime = liveTime;
this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();}
@Overridepublic int compareTo(Delayed o) {
if (o ==
null)
return 1;
if (o ==
this)
return 0;
if (o
instanceof DelayedItem) {DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
if (liveTime > tmpDelayedItem.liveTime) {
return 1;}
else if (liveTime == tmpDelayedItem.liveTime) {
return 0;}
else {
return -
1;}}
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return diff >
0 ?
1 : diff ==
0 ?
0 : -
1;}
@Overridepublic long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.currentTimeMillis(), unit);}
public T
getT() {
return t;}
public void setT(T t) {
this.t = t;}
@Overridepublic int hashCode() {
return t.hashCode();}
@Overridepublic boolean equals(Object object) {
if (object
instanceof DelayedItem) {
return object.hashCode() == hashCode() ?
true :
false;}
return false;}}
Computable 接口
public interface Computable<V> {V compute(String k);}
測試類
public class FutureTaskDemo {public static void main(String[] args)
throws InterruptedException {Thread t =
new Thread(() -> {CacheBean<String> cb =
new CacheBean<>(k -> {
try {System.out.println(
"模擬計(jì)算數(shù)據(jù),計(jì)算時(shí)長2秒。key=" + k);TimeUnit.SECONDS.sleep(
2);}
catch (InterruptedException e) {e.printStackTrace();}
return "你好:" + k;},
5000);
try {
while (
true) {System.out.println(
"thead2:" + cb.compute(
"b"));TimeUnit.SECONDS.sleep(
1);}}
catch (InterruptedException e) {e.printStackTrace();}});t.start();
while (
true) {CacheBean<String> cb =
new CacheBean<>(k -> {
try {System.out.println(
"模擬計(jì)算數(shù)據(jù),計(jì)算時(shí)長2秒。key=" + k);TimeUnit.SECONDS.sleep(
2);}
catch (InterruptedException e) {e.printStackTrace();}
return "你好:" + k;},
5000);System.out.println(
"thead1:" + cb.compute(
"b"));TimeUnit.SECONDS.sleep(
1);}}
}
執(zhí)行結(jié)果:
兩個(gè)線程同時(shí)訪問同一個(gè)key的緩存。從執(zhí)行結(jié)果發(fā)現(xiàn),每次緩存失效后,同一個(gè)key只執(zhí)行一次計(jì)算,而不是多個(gè)線程并發(fā)執(zhí)行同一個(gè)計(jì)算然后緩存。
本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點(diǎn)擊這里快速進(jìn)入簡書
GIT地址:http://git.oschina.net/brucekankan/
點(diǎn)擊這里快速進(jìn)入GIT
總結(jié)
以上是生活随笔為你收集整理的使用DelayQueue 和 FutureTask 实现java中的缓存的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。