日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java 高并发_Java 高并发之无锁(CAS)

發布時間:2023/12/19 java 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 高并发_Java 高并发之无锁(CAS) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java 高并發之無鎖(CAS)

本篇主要講 Java中的無鎖 CAS ,無鎖 顧名思義就是 以不上鎖的方式解決并發問題,而不使用synchronized 和 lock 等。。

1. Atomic 包

java.util.concurrent.atomic 包下類都是原子類,原子類都是基于 sun.misc.Unsafe 實現的

基本可以分為一下幾類:

  • 原子性基本數據類型:AtomicBoolean、AtomicInteger、AtomicLong
  • 原子性對象引用類型:AtomicReference、AtomicStampedReference、AtomicMarkableReference
  • 原子性數組:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  • 原子性對象屬性更新:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
  • 2.CAS 的概念

    無鎖 是指 所有的線程都能 無障礙的到達臨界點(但是能不能成功操作臨界點的值 是不一定的) ,無鎖的實現采用的是 CAS(compare and swap) 算法實現的
  • CAS 需要依賴 CPU提供的 CAS指令,CPU 為了解決并發問題,提供了 CAS 指令
  • CAS 指令需要3個參數 ,(變量值 、預期值、新值),只有當 變量值和預期值 相同的時候 才會更新變量值為新值
  • CAS 是一條 CPU 指令,由 CPU 硬件級別上保證原子性
  • 3.AtomicInteger 案例 以源碼

    AtomicInteger 我相信應該大部分都用過吧, 用于解決 count++

    因為count ++ 操作 不是原子的,可能 第一個線程拿到的 count = 0 將要進行++操作的時候,被其他線程搶占了CPU資源,第二個線程此時讀取的還是count = 0 ,那么這樣就會造成問題, 而AtomicInteger 它內部使用CAS 算法 保證了并發安全問題。

    public class AtomicIntegerDemo { ?private static int count = 0;private static AtomicInteger atomicInteger = new AtomicInteger(0); ?public static void main(String[] args) throws InterruptedException { ?Thread[] threads = new Thread[10]; ?for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}for(int j = 0; j < 1000; j++){count++;atomicInteger.incrementAndGet(); //內部CAS 進行自增}});thread.start();threads[i] = thread;}for (int i = 0; i < threads.length; i++) {threads[i].join();}System.out.println("結果: " + count);System.out.println("結果: " + atomicInteger.get());} }

    輸出結果發現,atomicInteger 保證了數據正確

    源碼分析:內部通過 unafe 類提供的方法處理的,Java中的Unsafe類為我們提供了類似C++手動管理內存的能力,可以直接獲取某個屬性的內容地址,并且提供了一些對內存地址上的值得操作

    private static final Unsafe unsafe = Unsafe.getUnsafe();//獲取value 屬性的 對象偏移量static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}/*** Atomically increments by one the current value.** @return the updated value*/public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;} ?

    unsafe的 getAndAddInt 方法 ,var5 相當于期望值 只有 內部調用 compareAndSwapInt 判斷var1 對象(AtomicInteger) 的內存偏移值var2 (valueOffset) 是否和期望值相同,如果相同表示 這期間沒有其他線程修改, 否則 自旋再 獲取 當前var5 再比較

    //var5是期望值 var5+var4 是新值 public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);//獲取this上的 valueOffset的偏移量的值 value} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); ?return var5;} ?

    這個方法是 native (C/C++)編寫的代碼,內部實現 使用了CPU的 CAS指令 具有原子性

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    AtomicLong 和 AtomicInteger類似 這里不展開說了,

    4.AtomicBoolean 原理和使用場景

    而 AtomicBoolean 也類似,不過內部value 是一個int 通過 int值是 1 還是 0 當做 true / false

    public AtomicBoolean(boolean initialValue) {value = initialValue ? 1 : 0; } ? public final boolean compareAndSet(boolean expectedValue, boolean newValue) {return VALUE.compareAndSet(this,(expectedValue ? 1 : 0),(newValue ? 1 : 0)); } ?

    當在多個線程 只處理 一次初始化功能的時候 可以使用

    private static AtomicBoolean initialized = new AtomicBoolean(false); public void init() {if( initialized.compareAndSet(false, true) ){// 這里放置初始化代碼....} }

    5.AtomicReference

    一直很難理解 AtmoicReference 的 使用場景,畢竟 java賦值操作本來就是 原子性的
    作用在 對 對象進行原子操作 提供了一種讀和寫都是原子性的 對象引用變量

    AtomicReference和AtomicInteger非常類似 AtomicReference則對應普通的對象引用,底層使用的是compareAndSwapObject實現CAS,比較的是兩個對象的地址是否相等。也就是它可以保證你在修改對象引用時的線程安全性。

    疑問:引用類型的賦值是原子的 為什么要有AtomicReference 來保證修改對象引用時的線程安全性 呢?

    真正需要使用AtomicReference的場景是你需要CAS類操作時,由于涉及到比較、設置等多于一個的操作

    下面的案例是 :模擬 攔截器攔截到請求 記錄ip 然后 去更新最新的 LoginDetailInfo 登錄信息 使用AtomicReference 去包裝老的 登錄信息 , 使用5個線程去同時更新最新的 登錄信息 package com.johnny.atomic; ? ? import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; ? import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; ? /*** @author johnny* @create 2020-09-16 下午3:58**/ public class AtomicReferenceDemo { ? ?private static final int CORE_SIZE = 5;private static final int MAX_SIZE = 10; ?private static final int QUEUE_SIZE = 100; ?private static final CountDownLatch countDownLatch = new CountDownLatch(5); ? ?public static void main(String[] args) throws InterruptedException { ? ?ExecutorService executorService = new ThreadPoolExecutor(CORE_SIZE, MAX_SIZE, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE)); ?//interceptor ip//根據攔截器攔截到 最新的登錄ipString newIp = "47.98.250.170"; ?//from db search old LoginDetailInfo//從數據庫中查到 最新的一次 登錄信息LoginDetailInfo oldLoginDetailInfo = LoginDetailInfo.builder().loginCount(10).ip("47.98.250.138").timeStamp(System.currentTimeMillis()).build(); ?//封裝老的 登錄信息AtomicReference<LoginDetailInfo> atomicReference = new AtomicReference<>(oldLoginDetailInfo); ?//封裝任務TaskRun taskRun = new TaskRun(atomicReference, newIp);//使用5個線程去 同時更新 最新的 登錄信息,只要有一個成功即可for (int i = 0; i < 5; i++) {executorService.execute(taskRun);} ?//主線程 等待countDownLatch.await(); ?System.out.println("【所有線程執行完畢 main 線程結束 】");} ? ?static class TaskRun implements Runnable { ?private String ip; ?private AtomicReference<LoginDetailInfo> atomicReference; ?/*** @param atomicReference : 老的 atomicReference*/public TaskRun(AtomicReference<LoginDetailInfo> atomicReference, String ip) {this.ip = ip;this.atomicReference = atomicReference;} ?@Overridepublic void run() { ?if (atomicReference != null) { ?LoginDetailInfo oldLoginDetailInfo = atomicReference.get(); ?long count = oldLoginDetailInfo.getLoginCount() + 1; ?LoginDetailInfo newLoginDetailInfo = new LoginDetailInfo();newLoginDetailInfo.setLoginCount(count);newLoginDetailInfo.setIp(ip);newLoginDetailInfo.setTimeStamp(System.currentTimeMillis()); ?try {//模擬 執行一下其他操作 可以讓多個線程 都同時走到了 這里 保證多個線程 拿到的 oldLoginDetailInfo 是一個 ,這樣才會只有一個 能更新成功Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} ?if (atomicReference.compareAndSet(oldLoginDetailInfo, newLoginDetailInfo)) {//save To Db or some ...System.out.println(Thread.currentThread().getName() + " 線程更新成功 --- {}");System.out.println(newLoginDetailInfo);}else{System.out.println(Thread.currentThread().getName() + " 線程更新失敗 其他線程已經更新 ---");}//計數器 -1countDownLatch.countDown();} ?}} ? ?@Data@AllArgsConstructor@NoArgsConstructor@Builderstatic class LoginDetailInfo { ?private long loginCount; ?private String ip; ?private long timeStamp; ?} ? }

    運行結果如下:

    這里的使用場景很牽強,不知道你會 怎么使用AtomicReference 歡迎在下方留言 給我提點思路

    6. AtomicStampRefence

    線程判斷被修改對象是否可以正確寫入的條件是對象的當前值和期望是否一致。這個邏輯從一般意義上來說是正確的。但有可能出現一個小小的例外,就是當你獲得對象當前數據后,在準備修改為新值前,對象的值被其他線程連續修改了2次,而經過這2次修改后,對象的值又恢復為舊值。這樣,當前線程就無法正確判斷這個對象究竟是否被修改過,這就是典型的 CAS ABA問題

    如何解決 ABA問題呢,其實Java 提供了 AtomicStampRefence 就是用來解決ABA問題的

    模擬如下案例:

    現有一個用單向鏈表實現的堆棧,棧頂為A,這時線程T1已經知道A.next為B,然后希望用CAS將棧頂替換為B:

    head.compareAndSet(A,B);

    在T1執行上面這條指令之前,線程T2介入,將A、B出棧,再pushD、C、A,此時堆棧結構如下圖,而對象B此時處于游離狀態:

    此時輪到線程T1執行CAS操作,檢測發現棧頂仍為A,所以CAS成功,棧頂變為B,但實際上B.next為null,所以此時的情況變為:

    其中堆棧中只有B一個元素,C和D組成的鏈表不再存在于堆棧中,平白無故就把C、D丟掉了。

    下面我們就來用代碼 模擬上面這個案例:

    6.1 首先我們定義 棧 :

    /****/@Datastatic class Node<T> { ?private T value; ?private Node<T> next; ?public Node(T value) {this.value = value;}} ? ?static class Stack<T> { ?private Node<T> top; ?private int length = 0; ? ?public boolean isEmpty() {if (length == 0 || top == null) {return true;} else {return false;}} ?public void push(Node<T> node) {if (isEmpty()) {top = node;} else {//交換//node.setNext(top);top = node;}length++;} ?public void push(T value) {Node<T> node = new Node<T>(value);if (isEmpty()) {top = node;} else {//交換node.setNext(top);top = node;}length++;} ?public T pop() { ?if (isEmpty()) {System.out.println("棧 為空");return null;} else {T result = top.getValue();top = top.next;length--;return result;}} ?public T top() {if (isEmpty()) {System.out.println("棧 為空");return null;} else {return top.getValue();}} ?public Node<T> topNode() {if (isEmpty()) {System.out.println("棧 為空");return null;} else {return top;}} ?public Node<T> topNextNode() {if (isEmpty()) {System.out.println("棧 為空");return null;} else {return top.next; ?}} ?public int size() {return length;} ?public void deleteStack() {top = null;length = 0;} ?}

    6.2 使用 AtmoicReference 來看看帶來的問題

    package com.johnny.atomic; ? import lombok.Data; ? import java.util.concurrent.atomic.AtomicReference; ? /*** ABA 問題Demo** @author johnny* @create 2020-09-20 下午5:52**/ public class AtomicReferenceDemo { ? ?/*** 使用 Stack 來 模擬 CAS ABA 問題** @param args*/public static void main(String[] args) throws InterruptedException { ? ?//1.初始化 棧Stack<String> stack = new Stack<String>(); ?stack.push("B");stack.push("A"); ?//headReference 保存 當前的 棧的 topAtomicReference<Node<String>> headReference = new AtomicReference<>(stack.topNode()); ? ?//2.需要2個線程TaskA taskA = new TaskA(stack, headReference);TaskB taskB = new TaskB(stack, headReference); ? ?Thread threadA = new Thread(taskA);Thread threadB = new Thread(taskB); ?threadA.start();threadB.start(); ? ?threadA.join();threadB.join(); ?System.out.println(stack.pop());System.out.println(stack.pop());System.out.println(stack.pop()); ?} ?static class TaskA implements Runnable { ?private AtomicReference<Node<String>> headReference;private Stack<String> stack; ?public TaskA(Stack<String> stack, AtomicReference<Node<String>> headReference) {this.stack = stack;this.headReference = headReference;} ?@Overridepublic void run() { ?if (stack.topNode() == headReference.get()) {Node<String> aNode = headReference.get();Node<String> bNode = stack.topNextNode(); ?//讓TaskB 去把 AB 出棧并且 入 D C Atry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//此時 TaskB 把 棧 變成 D C A top還是A 更新 因為此時TaskA還是認為 B.next = null 就會弄丟 D Cif (headReference.compareAndSet(aNode, bNode)) {//把A出棧stack.pop();//添加B 當做topstack.push(bNode);} else {System.out.println("有其他線程 已更新 top");}}}} ?static class TaskB implements Runnable { ?private AtomicReference<Node<String>> headReference;private Stack<String> stack; ?public TaskB(Stack<String> stack, AtomicReference<Node<String>> headReference) {this.stack = stack;this.headReference = headReference;} ?@Overridepublic void run() {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}if (stack.topNode() == headReference.get()) {Node<String> aNode = headReference.get();//A 出棧stack.pop();//B 出棧stack.pop(); ?Node<String> dNode = new Node<>("D");Node<String> cNode = new Node<>("C");dNode.next = cNode;cNode.next = aNode;//依次入棧 D C Aif (headReference.compareAndSet(aNode, dNode)) {stack.push(dNode);}if (headReference.compareAndSet(dNode, cNode)) {stack.push(cNode);}if (headReference.compareAndSet(cNode, aNode)) {stack.push(aNode);}} ? ?}} ? }

    可以看到 最后棧里 只有 B了,其他的 D C 都被弄丟了

    6.3 通過使用 AtomicStampedReference 來改進

    AtomicStampedReference 內部還維護了一個Stamp 來記錄 更改次數 ,這樣當 從 A 變到 B 再變回A的時候,雖然值 沒變,但是Stamp 變化了,依然不能更新成功/*** 使用 Stack 來 模擬 CAS ABA 問題** @param args*/public static void main(String[] args) throws InterruptedException { ? ?//1.初始化 棧Stack<String> stack = new Stack<String>(); ?stack.push("B");stack.push("A"); ?AtomicStampedReference<Node<String>> head = new AtomicStampedReference<>(stack.topNode(), 0); ? ?//2.需要2個線程TaskA taskA = new TaskA(stack, head);TaskB taskB = new TaskB(stack, head); ? ?Thread threadA = new Thread(taskA);Thread threadB = new Thread(taskB); ?threadA.start();threadB.start(); ? ?threadA.join();threadB.join(); ?System.out.println(stack.pop());System.out.println(stack.pop());System.out.println(stack.pop()); ?} ?static class TaskA implements Runnable { ?private Stack<String> stack = null; ?private AtomicStampedReference<Node<String>> headStampReference; ?public TaskA(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {this.stack = stack;this.headStampReference = headStampReference;} ?@Overridepublic void run() { ?if (stack.topNode() == headStampReference.getReference()) { ?AtomicStampedReference<Node<String>> headNode = this.headStampReference;int stamp = headNode.getStamp(); ?Node<String> bNode = stack.topNextNode(); ?//讓TaskB 去把 AB 出棧并且 入 D C Atry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//此時 TaskB 把 棧 變成 D C A top還是A 更新 因為此時TaskA還是認為 B.next = null 就會弄丟 D Cif (headNode.compareAndSet(stack.topNode(), bNode, stamp, stamp + 1)) {stack.pop(); //A出棧//入棧B 但是 B的 next = nullstack.push(bNode);} else {System.out.println("有其他線程 已更新 top");}}}} ?static class TaskB implements Runnable { ?private Stack<String> stack; ?private AtomicStampedReference<Node<String>> headStampReference; ?public TaskB(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {this.stack = stack;this.headStampReference = headStampReference;} ? ?@Overridepublic void run() {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}if (stack.topNode() == headStampReference.getReference()) {//A 出棧stack.pop();//B 出棧stack.pop(); ?Node<String> aNode = headStampReference.getReference(); ?Node<String> dNode = new Node<>("D");Node<String> cNode = new Node<>("C");cNode.next = dNode;aNode.next = cNode; ?//依次入棧 D C Aif (headStampReference.compareAndSet(aNode, dNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {stack.push(dNode);}if (headStampReference.compareAndSet(dNode, cNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {stack.push(cNode);}if (headStampReference.compareAndSet(cNode, aNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {stack.push(aNode);}} ?}} ? ?

    可以看到 B 并沒有被更新到 棧中 因為 通過AtomicStampedReference 的Stampd 已經能夠知道 這個棧已經被更改過

    7. AtomicMarkableReference

    AtomicMarkableReference可以理解為上面AtomicStampedReference的簡化版,就是不關心修改過幾次,僅僅關心是否修改過。因此變量mark是boolean類型,僅記錄值是否有過修改。private static class Pair<T> {final T reference;final boolean mark;private Pair(T reference, boolean mark) {this.reference = reference;this.mark = mark;}static <T> Pair<T> of(T reference, boolean mark) {return new Pair<T>(reference, mark);} }

    和上面的 AtomicStampedReference 類似 也來解決 ABA 問題 就不展開說了

    8. AtomicIntegerFieldUpdater

    AtomicIntegerFieldUpdater,它實現了可以線程安全地更新對象中的整型變量。比如如下的 int
    類型的 count 變量 ,當你的系統有些變量當初并沒有設計成 Atomic ,那么現在可以通過FieldUpdater來安全的 更新,并且不需要帶來 對系統代碼的修改,最多只需要添加 一個 volatile , 不會造成什么影響public class AtomicIntegerFieldUpdaterDemo { ?public int count = 100; ?public static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterDemo> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterDemo.class, "count"); ? ?public static void main(String[] args) { ?AtomicIntegerFieldUpdaterDemo atomicIntegerFieldUpdaterDemo = new AtomicIntegerFieldUpdaterDemo(); ?if (atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo, 100, 200)) {System.out.println("更新成功 count : " + atomicIntegerFieldUpdaterDemo.count);} ?if(atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo,100,300)){System.out.println("更新成功 count : " + atomicIntegerFieldUpdaterDemo.count);}}

    報錯了:因為必須是 volatile 變量,并且不能是private

    public volatile int count = 100;

    打印結果:

    更新成功 count : 200

    總結

    本篇主要講解了 Java中 無鎖CAS相關知識,包括 常用的AtomicInteger ,AtomicBoolean , AtomicReference AtomicStampedReference 和AtomicMarkableReference, AtomicIntegerFieldUpdater,以及介紹了ABA問題,和解決方法。 一起來了解吧!

    總結

    以上是生活随笔為你收集整理的java 高并发_Java 高并发之无锁(CAS)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。