闭关修炼(四)并发包/类
傳統藝能,點到為止
文章目錄
- 并發包
- 你接觸過哪些線程安全類?
- 你使用過哪些jdk1.5并發包下的類?
- Vector和ArrayList
- Vector和ArrayList的區別
- Vector源碼
- ArrayList源碼
- Hashtable和HashMap
- Hashtable和HashMap的區別?
- Hashtable和HashMap的底層實現?
- Hashtable put方法源碼
- HashMap put方法源碼
- SynchronizedMap
- 什么是SynchronizedMap?
- SynchronizedMap原理?
- ConcurrentHashMap
- ConcurrentHashMap設計思路/底層?
- CountDownLatch
- 什么是CountDownLatch?
- CountDownLatch例子
- CyclicBarrier
- 什么是CyclicBarrier?
- CyclicBarrier例子
- Semaphore
- 什么是Semaphore
- Semaphore例子
- 并發隊列
- 并發隊列有界和無界的區別?
- 阻塞與非阻塞隊列的區別?
- 非阻塞式隊列ConcurrentLinkedDeque
- 阻塞式隊列BlockingQueue
- BlockingQueue和ConcurrentLinkedDeque的區別?
- 生產者消費者例子
并發包
你接觸過哪些線程安全類?
你使用過哪些jdk1.5并發包下的類?
Vector和ArrayList
Vector和ArrayList的區別
Vector和ArrayList原理都是由數組實現的,查詢速度塊,增加、修改和刪除速度慢。
最大的區別在于線程安全問題,Vector是線程安全的,ArrayList是線程不安全的,但ArrayList效率更高。
Vector是線程安全的那么必然是上了鎖的類集合。
Vector源碼
點進Vector類,看get set add方法的實現,我們發現他們的方法都被synchronized所修飾。多個線程使用Vector類,只要有一個線程在操作,其他線程都讀不了數據,還引發鎖資源競爭,因此效率很低。
/*** Returns the element at the specified position in this Vector.** @param index index of the element to return* @return object at the specified index* @throws ArrayIndexOutOfBoundsException if the index is out of range* ({@code index < 0 || index >= size()})* @since 1.2*/public synchronized E get(int index) {if (index >= elementCount)throw new ArrayIndexOutOfBoundsException(index);return elementData(index);}/*** Replaces the element at the specified position in this Vector with the* specified element.** @param index index of the element to replace* @param element element to be stored at the specified position* @return the element previously at the specified position* @throws ArrayIndexOutOfBoundsException if the index is out of range* ({@code index < 0 || index >= size()})* @since 1.2*/public synchronized E set(int index, E element) {if (index >= elementCount)throw new ArrayIndexOutOfBoundsException(index);E oldValue = elementData(index);elementData[index] = element;return oldValue;}/*** Appends the specified element to the end of this Vector.** @param e element to be appended to this Vector* @return {@code true} (as specified by {@link Collection#add})* @since 1.2*/public synchronized boolean add(E e) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = e;return true;}ArrayList源碼
點進ArrayList一看就知道,是沒有做任何的同步
/*** Returns the element at the specified position in this list.** @param index index of the element to return* @return the element at the specified position in this list* @throws IndexOutOfBoundsException {@inheritDoc}*/public E get(int index) {rangeCheck(index);return elementData(index);}/*** Replaces the element at the specified position in this list with* the specified element.** @param index index of the element to replace* @param element element to be stored at the specified position* @return the element previously at the specified position* @throws IndexOutOfBoundsException {@inheritDoc}*/public E set(int index, E element) {rangeCheck(index);E oldValue = elementData(index);elementData[index] = element;return oldValue;}/*** Appends the specified element to the end of this list.** @param e element to be appended to this list* @return <tt>true</tt> (as specified by {@link Collection#add})*/public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true;}Hashtable和HashMap
Hashtable和HashMap的區別?
HashTable線程安全,HashMap線程不安全
Hashtable和HashMap的底層實現?
//todo 以后在細寫吧。
鏈表+數組,鏈表做增加刪除, HashCode取模得到下標位置,一致性取模算法。
Hashtable put方法源碼
很明顯可以看到put方法加了synchronized關鍵字。
public synchronized V put(K key, V value) {// Make sure the value is not nullif (value == null) {throw new NullPointerException();}// Makes sure the key is not already in the hashtable.Entry<?,?> tab[] = table;int hash = key.hashCode();int index = (hash & 0x7FFFFFFF) % tab.length;@SuppressWarnings("unchecked")Entry<K,V> entry = (Entry<K,V>)tab[index];for(; entry != null ; entry = entry.next) {if ((entry.hash == hash) && entry.key.equals(key)) {V old = entry.value;entry.value = value;return old;}}addEntry(hash, key, value, index);return null;}HashMap put方法源碼
沒有synchronized關鍵字
/** * Associates the specified value with the specified key in this map.* If the map previously contained a mapping for the key, the old* value is replaced.** @param key key with which the specified value is to be associated* @param value value to be associated with the specified key* @return the previous value associated with <tt>key</tt>, or* <tt>null</tt> if there was no mapping for <tt>key</tt>.* (A <tt>null</tt> return can also indicate that the map* previously associated <tt>null</tt> with <tt>key</tt>.)*/public V put(K key, V value) {return putVal(hash(key), key, value, false, true);}/*** Implements Map.put and related methods** @param hash hash for key* @param key the key* @param value the value to put* @param onlyIfAbsent if true, don't change existing value* @param evict if false, the table is in creation mode.* @return previous value, or null if none*/final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict) {Node<K,V>[] tab; Node<K,V> p; int n, i;if ((tab = table) == null || (n = tab.length) == 0)n = (tab = resize()).length;if ((p = tab[i = (n - 1) & hash]) == null)tab[i] = newNode(hash, key, value, null);else {Node<K,V> e; K k;if (p.hash == hash &&((k = p.key) == key || (key != null && key.equals(k))))e = p;else if (p instanceof TreeNode)e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);else {for (int binCount = 0; ; ++binCount) {if ((e = p.next) == null) {p.next = newNode(hash, key, value, null);if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1sttreeifyBin(tab, hash);break;}if (e.hash == hash &&((k = e.key) == key || (key != null && key.equals(k))))break;p = e;}}if (e != null) { // existing mapping for keyV oldValue = e.value;if (!onlyIfAbsent || oldValue == null)e.value = value;afterNodeAccess(e);return oldValue;}}++modCount;if (++size > threshold)resize();afterNodeInsertion(evict);return null;}SynchronizedMap
什么是SynchronizedMap?
是Collections中的靜態類,可以將不安全Map集合轉變為安全的集合
//todo Collections再單獨拿出來寫吧…
SynchronizedMap原理?
原理無非就是用了Object鎖的synchronized代碼塊
private static class SynchronizedMap<K,V>implements Map<K,V>, Serializable {private static final long serialVersionUID = 1978198479659022715L;private final Map<K,V> m; // Backing Mapfinal Object mutex; // Object on which to synchronizeSynchronizedMap(Map<K,V> m) {this.m = Objects.requireNonNull(m);mutex = this;}SynchronizedMap(Map<K,V> m, Object mutex) {this.m = m;this.mutex = mutex;}public int size() {synchronized (mutex) {return m.size();}}public boolean isEmpty() {synchronized (mutex) {return m.isEmpty();}}public boolean containsKey(Object key) {synchronized (mutex) {return m.containsKey(key);}}public boolean containsValue(Object value) {synchronized (mutex) {return m.containsValue(value);}}public V get(Object key) {synchronized (mutex) {return m.get(key);}}public V put(K key, V value) {synchronized (mutex) {return m.put(key, value);}}public V remove(Object key) {synchronized (mutex) {return m.remove(key);}}public void putAll(Map<? extends K, ? extends V> map) {synchronized (mutex) {m.putAll(map);}}public void clear() {synchronized (mutex) {m.clear();}}//... }ConcurrentHashMap
jdk1.5之后產生了許多的java并發包,ConcurrentHashMap就是其中之一,為解決1.2的Hashtable雖然是線程安全的但是效率非常低,造成鎖的資源競爭問題。
ConcurrentHashMap設計思路/底層?
分段鎖,將一個整體Map拆分成多個小的Hashtable,默認分成16段(上限為16),我們具體的假設,如Map的下標0到4分為一個Hashtable,5-9分為一個Hashtable,10-14分為一個Hashtable…多線程的情況下,線程①查詢下標2,線程②查詢下標5,線程③查詢下標10,那么這三個線程并不使用同一把鎖,相比之前的Hashtable,Hashtable三個線程共用同一把鎖,ConcurrentHashMap減少了鎖的資源競爭,因此效率得到了提高。并且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容。volatile不只是起到可見性的作用,還起禁止重排序功能。
CountDownLatch
什么是CountDownLatch?
CountDownLatch類位于concurrent包下,利用它可以實現類似計數器的功能,比如有一個任務A,它要等其他4個任務執行完畢后才能執行,此時就可以使用CountDownLatch來實現這種功能了。相比join可能會更方便一些。
CountDownLatch例子
使用起來很簡單,實例化CountDownLatch,給初始值,使用countDown函數計數減一,使用countDownLatch.await()進行阻塞判斷,大于0一直阻塞,小于等于0停止阻塞。
代碼見下:
import lombok.SneakyThrows;import java.util.concurrent.CountDownLatch;public class CountDown {@SneakyThrowspublic static void main(String[] args) {// 定義計數器CountDownLatch countDownLatch = new CountDownLatch(2);Thread thread1 = new Thread(new Runnable() {@SneakyThrows@Overridepublic void run() {System.out.println("我是子線程1執行任務");Thread.sleep(10);System.out.println("我是子線程1執行任務");// 計數器減一countDownLatch.countDown();}});Thread thread2 = new Thread(new Runnable() {@SneakyThrows@Overridepublic void run() {System.out.println("我是子線程2執行任務");Thread.sleep(10);System.out.println("我是子線程2執行任務");countDownLatch.countDown();}});thread1.start();thread2.start();// 如果不為0,阻塞countDownLatch.await();System.out.println("主線程開始執行任務");for (int i = 0; i < 3; i++) {Thread.sleep(1000);System.out.println(i);}System.out.println("主線程執行任務結束");} }CyclicBarrier
什么是CyclicBarrier?
jdk1.5并發包中的類,用的不多,了解即可,也是做計數用的,當我們線程到達一定次數,開始并行執行。
CyclicBarrier例子
import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.SneakyThrows;import java.util.concurrent.CyclicBarrier;@EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor class MyThread extends Thread {private CyclicBarrier cyclicBarrier;@SneakyThrows@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ",開始寫入任務");// 模擬任務執行時間Thread.sleep(1);// await大于0時線程阻塞和計數減一,當為0的時候,所有線程共同并行cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + ",寫入任務結束...");} }public class CyclicTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(50);for (int i = 0; i < 50; i++) {new MyThread(cyclicBarrier).start();}} }Semaphore
什么是Semaphore
Semaphore屬于并發包中的一類,Semaphore可以看作是一種基于計數的信號量,可以設定一個閾值,這個閾值表示最多支持幾個線程訪問,基于此,多個線程競爭獲取許可信號,做自己的申請后歸還,超過閾值后,線程申請許可信號將會被阻塞。
Semaphore可以用來構建一些對象池,資源池,比如數據庫連接池,Semaphore計數為1,將變成類似互斥鎖的機制。
Semaphore這個概念應該并不陌生。
Semaphore例子
掌握主要方法acquire、availablePermits、release,獲取和釋放資源
acquire 獲取資源,計數-1,阻塞等待
release釋放資源計數+1
availablePermits返回此Semaphore對象中當前可用的許可數,許可的數量有可能實時在改變,并不是固定的數量。
并發隊列
并發隊列也是并發包中的,他們都是線程安全的。
生產消費者模型中的緩沖buff就可以看作是一個并發隊列
隊列遵循規則:先進先出
并發隊列有界和無界的區別?
Array數組規定長度,不能超過長度,就是有界的
無界支持無限制存放。
阻塞與非阻塞隊列的區別?
生產者寫入滿的時候,即隊列滿了,線程進行等待;消費者當隊列為空的時候,也進行等待,就是阻塞隊列。
非阻塞的,滿了或者空了線程不等待直接掛掉。
非阻塞式隊列ConcurrentLinkedDeque
非阻塞式無界限安全隊列 ConcurrentLinkedDeque和ConcurrentLinkedQueue,不同的是ConcurrentLinkedDueue是雙向鏈表,因此ConcurrentLinkedDueue既可以當做隊列也可當做棧來使用。
public class Qu {public static void main(String[] args) {ConcurrentLinkedDeque<String> concurrentLinkedDeque = new ConcurrentLinkedDeque<>();concurrentLinkedDeque.offer("張三");concurrentLinkedDeque.offer("李四");System.out.println(concurrentLinkedDeque.size());System.out.println(concurrentLinkedDeque.poll());System.out.println(concurrentLinkedDeque.size());System.out.println(concurrentLinkedDeque.poll());System.out.println(concurrentLinkedDeque.size());} }阻塞式隊列BlockingQueue
BlockingQueue
常用的四個類是ArrayBlockingQueue、LinkedBlockingQueue,PriorityBlockingQueue和SynchronizedQueue
BlockingQueue和ConcurrentLinkedDeque的區別?
BlockingQueue可阻塞,并且時間有界限,ConcurrentLinkedDeque不阻塞
生產者消費者例子
import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.SneakyThrows;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.sleep;@EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor class ProducerThread extends Thread {private BlockingQueue<String> blockingQueue;private static AtomicInteger count = new AtomicInteger();private volatile Boolean allowProducing = true;@SneakyThrows@Overridepublic void run() {System.out.println("生產者線程啟動");while (allowProducing) {System.out.println("正在生產隊列");String data = count.incrementAndGet() + "";boolean offer = blockingQueue.offer(data);if (offer) {System.out.println("生產者添加隊列成功");} else {System.out.println("生產者添加隊列失敗");}sleep(1000);}System.out.println("生產者線程停止");}public void stopThread() {this.allowProducing = false;} }@EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor class Consumer extends Thread {private BlockingQueue<String> blockingQueue;private volatile Boolean allowConsume = true;@SneakyThrows@Overridepublic void run() {System.out.println("消費者線程啟動");while (allowConsume) {String data = blockingQueue.poll(2, TimeUnit.SECONDS);if (data != null) {System.out.println("消費者獲取數據: " + data);} else {System.out.println("消費者獲取數據失敗");this.allowConsume = false;}sleep(1000);}}public void stopThread() {this.allowConsume = false;} }public class TestScz {public static void main(String[] args) throws InterruptedException {LinkedBlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(10);ProducerThread producerThread1 = new ProducerThread(blockingDeque, true);ProducerThread producerThread2 = new ProducerThread(blockingDeque, true);Consumer consumer = new Consumer(blockingDeque, true);producerThread1.start();producerThread2.start();consumer.start();Thread.sleep(10 * 1000);producerThread1.stopThread();producerThread2.stopThread();} }總結
以上是生活随笔為你收集整理的闭关修炼(四)并发包/类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无序列表举例html,HTML中的无序列
- 下一篇: 让旧衣服换新颜 听听章泽天怎么说!