package com.example.test.cas; import java.io.IOException; /** * @author hehang on 2019-10-09 * @description */ public class LockDemo { private volatile int i; public void add(){ i++; } public static void main(String[] args) throws IOException { LockDemo lockDemo = new LockDemo(); for (int i = 0; i <2 ; i++) { new Thread(() ->{ for (int j = 0; j <10000 ; j++) { lockDemo.add(); } }).start(); } System.in.read(); System.out.println(lockDemo.i); } }
package com.example.test.cas; import java.util.concurrent.atomic.AtomicInteger; /** * @author hehang on 2019-10-14 * @description */ public class AtomicTest { public static void main(String[] args) throws InterruptedException { // 自增 AtomicInteger atomicInteger = new AtomicInteger(0); for (int i = 0; i < 2; i++) { new Thread(() -> { for (int j = 0; j < 10000; j++) { atomicInteger.incrementAndGet(); } }).start(); } Thread.sleep(2000L); System.out.println(atomicInteger.get()); } }
下面就来探究下jdk为我们提供的原子操作类的原理,基于java native方法实现一个自己原子操作类
package com.example.test.cas; import sun.misc.Unsafe; import java.io.IOException; import java.lang.reflect.Field; /** * @author hehang on 2019-10-09 * @description */ public class LockCASDemo { private volatile int i; private static Unsafe unsafe; private static long offset; static{ try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); unsafe = (Unsafe) theUnsafe.get(null); offset = unsafe.objectFieldOffset(LockCASDemo.class.getDeclaredField("i")); } catch (Exception e) { e.printStackTrace(); } } public void add(){ int curent; int value; do{ curent = unsafe.getIntVolatile(this,offset); value = curent+1; }while (!unsafe.compareAndSwapInt(this,offset,curent,value)); } public static void main(String[] args) throws IOException { LockCASDemo lockDemo = new LockCASDemo(); for (int i = 0; i <2 ; i++) { new Thread(() ->{ for (int j = 0; j <10000 ; j++) { lockDemo.add(); } }).start(); } System.in.read(); System.out.println(lockDemo.i); } }
package com.example.test.cas; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * @author hehang on 2019-10-14 * @description asd */ public class LongAdderDemo { private long count = 0; // 同步代码块的方式 public void testSync() throws InterruptedException { for (int i = 0; i < 3; i++) { new Thread(() -> { long starttime = System.currentTimeMillis(); while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒 synchronized (this) { ++count; } } long endtime = System.currentTimeMillis(); System.out.println("SyncThread spend:" + (endtime - starttime) + "ms" + " v" + count); }).start(); } } // Atomic方式 private AtomicLong acount = new AtomicLong(0L); public void testAtomic() throws InterruptedException { for (int i = 0; i < 3; i++) { new Thread(() -> { long starttime = System.currentTimeMillis(); while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒 acount.incrementAndGet(); // acount++; } long endtime = System.currentTimeMillis(); System.out.println("AtomicThread spend:" + (endtime - starttime) + "ms" + " v-" + acount.incrementAndGet()); }).start(); } } // LongAdder 方式 private LongAdder lacount = new LongAdder(); public void testLongAdder() throws InterruptedException { for (int i = 0; i < 3; i++) { new Thread(() -> { long starttime = System.currentTimeMillis(); while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒 lacount.increment(); } long endtime = System.currentTimeMillis(); System.out.println("LongAdderThread spend:" + (endtime - starttime) + "ms" + " v-" + lacount.sum()); }).start(); } } public static void main(String[] args) throws InterruptedException { LongAdderDemo demo = new LongAdderDemo(); demo.testSync(); demo.testAtomic(); demo.testLongAdder(); } }
SyncThread spend:2000ms v23074332 SyncThread spend:2000ms v23094924 AtomicThread spend:2000ms v-38137398 AtomicThread spend:2000ms v-38152694 SyncThread spend:2011ms v23094924 AtomicThread spend:2000ms v-38416095 LongAdderThread spend:2000ms v-40097562 LongAdderThread spend:2000ms v-40606405 LongAdderThread spend:2001ms v-40917467 Process finished with exit code 0
package com.example.test.cas.aba; /** * @author hehang on 2019-10-14 * @description */ public class Node { public final String item; public Node next; public Node(String item) { this.item = item; } @Override public String toString() { return "item内容:" + this.item; } }
package com.example.test.cas.aba; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * @author hehang on 2019-10-14 * @description */ // 实现一个 栈(后进先出) public class Stack { // top cas无锁修改 AtomicReference<Node> top = new AtomicReference<Node>(); public void push(Node node) { // 入栈 Node oldTop; do { oldTop = top.get(); node.next = oldTop; } while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶 } // 为了演示ABA效果, 增加一个CAS操作的延时 public Node pop(int time) throws InterruptedException { // 出栈 -- 取出栈顶 Node newTop; Node oldTop; do { oldTop = top.get(); if (oldTop == null) { return null; } newTop = oldTop.next; if (time != 0) { System.out.println(Thread.currentThread() + " 休眠前拿到的数据" + oldTop.item); TimeUnit.SECONDS.sleep(time); // 休眠指定的时间 } } while (!top.compareAndSet(oldTop, newTop)); return oldTop; } }
package com.example.test.cas.aba; /** * @author hehang on 2019-10-14 * @description */ public class Test { public static void main(String[] args) throws InterruptedException { Stack stack = new Stack(); stack.push(new Node("B")); stack.push(new Node("A")); Thread thread1 = new Thread(() -> { try { System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(3)); // 再继续拿,就会有问题了,理想情况stack出数据应该是 A->C->D->B,实际上ABA问题导致A-B->null System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); } catch (Exception e) { e.printStackTrace(); } }); thread1.start(); Thread.sleep(300); // 让线程1先启动 Thread thread2 = new Thread(() -> { Node A = null; try { A = stack.pop(0); System.out.println(Thread.currentThread() + " 拿到数据:" + A); stack.push(new Node("D")); stack.push(new Node("C")); stack.push(A); } catch (Exception e) { e.printStackTrace(); } }); thread2.start(); } }
Thread[Thread-0,5,main] 睡一下,预期拿到的数据A Thread[Thread-1,5,main] 拿到数据:item内容:A Thread[Thread-0,5,main] 拿到数据:item内容:A Thread[Thread-0,5,main] 拿到数据:item内容:B Thread[Thread-0,5,main] 拿到数据:null Thread[Thread-0,5,main] 拿到数据:null Process finished with exit code 0
package com.example.test.cas.aba; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference; /** * @author hehang on 2019-10-14 * @description */ public class ConcurrentStack { AtomicStampedReference<Node> top = new AtomicStampedReference<Node>(null,0); public void push(Node node){ Node oldTop; int v; do{ v=top.getStamp(); oldTop = top.getReference(); node.next = oldTop; } while(!top.compareAndSet(oldTop, node,v,v+1)); // }while(!top.compareAndSet(oldTop, node,top.getStamp(),top.getStamp()+1)); } public Node pop(int time){ Node newTop; Node oldTop; int v; do{ v=top.getStamp(); oldTop = top.getReference(); if(oldTop == null){ return null; } newTop = oldTop.next; try { if (time != 0) { System.out.println(Thread.currentThread() + " 睡一下,预期拿到的数据" + oldTop.item); TimeUnit.SECONDS.sleep(time); // 休眠指定的时间 } } catch (InterruptedException e) { e.printStackTrace(); } } while(!top.compareAndSet(oldTop, newTop,v,v+1)); // }while(!top.compareAndSet(oldTop, newTop,top.getStamp(),top.getStamp())); return oldTop; } public void get(){ Node node = top.getReference(); while(node!=null){ System.out.println(node.item); node = node.next; } } }
package com.example.test.cas.aba; /** * @author hehang on 2019-10-14 * @description */ public class Test2 { public static void main(String[] args) throws InterruptedException { ConcurrentStack stack = new ConcurrentStack(); stack.push(new Node("B")); stack.push(new Node("A")); Thread thread1 = new Thread(() -> { try { System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(3)); // #再继续拿,就会有问题了,理想情况stack出数据应该是 A->C->D->B,实际上ABA问题导致A-B->null System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0)); } catch (Exception e) { e.printStackTrace(); } }); thread1.start(); Thread.sleep(300); // 让线程1先启动 Thread thread2 = new Thread(() -> { Node A = null; try { A = stack.pop(0); System.out.println(Thread.currentThread() + " 拿到数据:" + A); stack.push(new Node("D")); stack.push(new Node("C")); stack.push(A); } catch (Exception e) { e.printStackTrace(); } }); thread2.start(); } }
Thread[Thread-0,5,main] 睡一下,预期拿到的数据A Thread[Thread-1,5,main] 拿到数据:item内容:A Thread[Thread-0,5,main] 睡一下,预期拿到的数据A Thread[Thread-0,5,main] 拿到数据:item内容:A Thread[Thread-0,5,main] 拿到数据:item内容:C Thread[Thread-0,5,main] 拿到数据:item内容:D Thread[Thread-0,5,main] 拿到数据:item内容:B Process finished with exit code 0