當程序更新一個變量時,如果多線程同時更新這個變量,可能得到期望之外的值,比如變量i=1,A線程更新i+1,B線程也更新i+1,經過兩個線程操作之后可能i不等于3,而是等于2。因為A和B線程在更新變量i的時候拿到的i都是1,這就是線程不安全的更新操作,通常我們會使用synchronized來解決這個問題,synchronized會保證多線程不會同時更新變量i。
?
-
import java.util.concurrent.CountDownLatch;public class UnSafeAdd {private static int threadCount=10;private static CountDownLatch countDown=new CountDownLatch(threadCount);private static int count=0;private static class Counter implements Runnable{@Overridepublic void run() {for(int i=0;i<1000;i++){count++;//非原子操作}countDown.countDown();}}public static void main(String[] args) throws InterruptedException {Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new Counter());}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();System.out.println(count);}
?
輸出:
?
8968
?
-
?import java.util.concurrent.CountDownLatch;public class SafeAddWithSyn {private static int threadCount=10;private static CountDownLatch countDown=new CountDownLatch(threadCount);private static int count=0;synchronized private static void addCount(){//同步方法count++;}private static class Counter implements Runnable{@Overridepublic void run() {for(int i=0;i<1000;i++){addCount();}countDown.countDown();}}public static void main(String[] args) throws InterruptedException {Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new Counter());}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();System.out.println(count);}}
?
輸出:
?
10000
而Java從JDK 1.5開始提供了java.util.concurrent.atomic包(以下簡稱Atomic包),這個包中的原子操作類提供了一種用法簡單、性能高效、線程安全地更新一個變量的方式。因為變量的類型有很多種,所以在Atomic包里一共提供了13個類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用和原子更新屬性(字段)。Atomic包里的類基本都是使用Unsafe實現的包裝類。
?
-
import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;public class SafeAddWithAtomicInteger {private static int threadCount=10;private static CountDownLatch countDown=new CountDownLatch(threadCount);private static AtomicInteger count=new AtomicInteger(0);//原子操作類private static class Counter implements Runnable{@Overridepublic void run() {for(int i=0;i<1000;i++){count.addAndGet(1);}countDown.countDown();}}public static void main(String[] args) throws InterruptedException {Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new Counter());}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();System.out.println(count.get());}}
?
- ?
輸出:
?
10000
原子更新基本類型
使用原子的方式更新基本類型,Atomic包提供了以下3個類。
AtomicBoolean:原子更新布爾類型。
AtomicInteger:原子更新整型。
AtomicLong:原子更新長整型。
以上3個類提供的方法幾乎一模一樣,所以本節僅以AtomicInteger為例進行講解,AtomicInteger的源碼如下:
?
-
public class AtomicInteger extends Number implements java.io.Serializable {private static final long serialVersionUID = 6214790243416807050L;// setup to use Unsafe.compareAndSwapInt for updates 使用Unsafe類的CAS操作實現更新private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;......//volatile保證可見性private volatile int value;//構造器public AtomicInteger(int initialValue) {value = initialValue;}public AtomicInteger() {}......//CAS更新public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}....../*以下方法都使用循環CAS更新數據*/<pre name="code" class="java"> public final int getAndSet(int newValue) {//以原子方式設置新值for (;;) {int current = get();if (compareAndSet(current, newValue))return current;}}
?
- ?
?public final int getAndIncrement() {//以原子方式自增 for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } ......
//以原子方式將輸入值與當前值相加并返回結果 public final int addAndGet(int delta) { for (;;) {//循環 int current = get(); int next = current + delta; if (compareAndSet(current, next))//CAS return next; } } ......}
?
?
原子更新數組
通過原子的方式更新數組里的某個元素,Atomic包提供了以3類
AtomicIntegerArray:原子更新整型數組里的元素。
AtomicLongArray:原子更新長整型數組里的元素。
AtomicReferenceArray:原子更新引用類型數組里的元素。
?
?
import java.util.concurrent.CountDownLatch;public class AtomicIntegerArrayTest {private static int threadCount=1000;private static CountDownLatch countDown=new CountDownLatch(threadCount);static int[] values=new int[10];private static class Counter implements Runnable{@Overridepublic void run() {for(int i=0;i<100;i++){for(int j=0;j<10;j++){//所有元素+1values[j]++;}}countDown.countDown();}}public static void main(String[] args) throws InterruptedException{Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new Counter());}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();for(int i=0;i<10;i++){System.out.print(values[i]+" ");}}}
- ?
輸出:
?
99997 99996 99997 99997 99996 99996 99996 99996 99996 99996
-
import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicIntegerArray;public class AtomicIntegerArrayTest {private static int threadCount=1000;private static CountDownLatch countDown=new CountDownLatch(threadCount);static int[] values=new int[10];static AtomicIntegerArray ai=new AtomicIntegerArray(values);private static class Counter implements Runnable{@Overridepublic void run() {for(int i=0;i<100;i++){for(int j=0;j<10;j++){//所有元素+1ai.getAndIncrement(j);}}countDown.countDown();}}public static void main(String[] args) throws InterruptedException{Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new Counter());}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();for(int i=0;i<10;i++){System.out.print(ai.get(i)+" ");}System.out.println();for(int i=0;i<10;i++){System.out.print(values[i]+" ");}}}
?
- ?
輸出:
?
100000 100000 100000 100000 100000 100000 100000 100000 100000 100000
0 0 0 0 0 0 0 0 0 0
需要注意的是,數組value通過構造方法傳遞進去,然后AtomicIntegerArray會將當前數組復制一份,所以當AtomicIntegerArray對內部的數組元素進行修改時,不會影響傳入的數組。
?
-
//An {@code int} array in which elements may be updated atomically.public class AtomicIntegerArray implements java.io.Serializable {......private final int[] array;//存儲數據的int數組......//構造器public AtomicIntegerArray(int length) {array = new int[length];}public AtomicIntegerArray(int[] array) {// Visibility guaranteed by final field guaranteesthis.array = array.clone();//這里會復制傳入的int數組,因此不會改變原來的int數組}public final int length() {return array.length;}......}
?
- ?
?
原子更新引用
原子更新基本類型的AtomicInteger,只能更新一個變量,如果要原子更新多個變量,就需要使用這個原子更新引用類型提供的類。Atomic包提供了以下3個類。
AtomicReference:原子更新引用類型。
AtomicReferenceFieldUpdater:原子更新引用類型里的字段。
AtomicMarkableReference:原子更新帶有標記位的引用類型。
?
-
import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicReference;public class Test {private static int threadCount=10;private static CountDownLatch countDown=new CountDownLatch(threadCount);public static AtomicReference<User> atomicUserRef = new AtomicReference<User>();private static class ReferenceUpdater implements Runnable{User user;public ReferenceUpdater(User user){this.user=user;}@Overridepublic void run() {for(int i=0;i<1000;i++){User oldUser=atomicUserRef.get();atomicUserRef.compareAndSet(oldUser, user);Thread.yield();}countDown.countDown();}}public static void main(String[] args) throws InterruptedException {Thread[] threads=new Thread[threadCount];for(int i=0;i<threadCount;i++){threads[i]=new Thread(new ReferenceUpdater(new User("name"+i,i)));}for(int i=0;i<threadCount;i++){threads[i].start();;}countDown.await();System.out.println(atomicUserRef.get().getName());System.out.println(atomicUserRef.get().getOld());}static class User {private String name;private int old;public User(String name, int old) {this.name = name;this.old = old;}public String getName() {return name;}public int getOld() {return old;}}}
?
- ?
?
輸出:
name1
1
每次輸出結果都不確定,10種情況都有可能,但是name屬性和age屬性是匹配的。
?
-
/**An object reference that may be updated atomically.*/public class AtomicReference<V> implements java.io.Serializable {<pre name="code" class="java"> ......
?
private static final Unsafe unsafe = Unsafe.getUnsafe();//用于CAS更新 ...... private volatile V value;//引用,volatile保證可見性 public AtomicReference(V initialValue) { value = initialValue; } public AtomicReference() { } //利用Unsafe類執行CAS操作 public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } ......
//循環CAS public final V getAndSet(V newValue) { while (true) { V x = get(); if (compareAndSet(x, newValue)) return x; } }
......}
?
?
原子更新字段
如果需原子地更新某個類里的某個字段時,就需要使用原子更新字段類,Atomic包提供了以下3個類進行原子字段更新。
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新長整型字段的更新器。
AtomicStampedReference:原子更新帶有版本號的引用類型。該類將整數值與引用關聯起來,可用于原子的更新數據和數據的版本號,可以解決使用CAS進行原子更新時可能出現的ABA問題。
要想原子地更新字段類需要兩步。第一步,因為原子更新字段類都是抽象類,每次使用的時候必須使用靜態方法newUpdater()創建一個更新器,并且需要設置想要更新的類和屬性。第二步,更新類的字段(屬性)必須使用public volatile修飾符。
?
-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;public class AtomicIntegerFieldUpdaterTest {// 創建原子更新器,并設置需要更新的對象類和對象的屬性private static AtomicIntegerFieldUpdater<User> a =AtomicIntegerFieldUpdater.newUpdater(User.class, "old");public static void main(String[] args) throws InterruptedException {// 設置柯南的年齡是10歲User conan = new User("conan", 10);// 柯南長了一歲,但是仍然會輸出舊的年齡System.out.println(a.getAndIncrement(conan));// 輸出柯南現在的年齡System.out.println(a.get(conan));}public static class User {private String name;public volatile int old;public User(String name, int old) {this.name = name;this.old = old;}public String getName() {return name;}public int getOld() {return old;}}}
?
- ?
輸出:
?
10
11
?