JAVA多线程知识点

1. 线程基础

1.1. 线程概念

线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元)。

它被包含在进程之中,是进程中的实际运作单位。

一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

1.2. 线程安全

  1. 原子性:线程共享内存同一时间只有一个线程可以操作(synchronized关键字,互斥锁)。
  2. 可见性:某个线程的操作可以立即被其他线程观察到(volatile关键字,强制刷新线程内该变量内存)。
  3. 有序性:线程内指令顺序执行(现代CPU指令重排,乱序执行);在java中指令重排不影响单线程内的最终结果,但不保证多线程。

1.3. 创建并启动线程的方法

  1. 实现 Runnable 接口
class ByRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println("实现Runnable并实现run方法");
    }
}
Thread thread = new Thread(new ByRunnable());
thread.start();//调用start以开启新线程
thread.join();//阻止主线程在新线程之前结束
  1. 继承 Thread 类
class ByThread extends Thread{
    @Override
    public void run() {
        System.out.println("继承Thread并重写run方法");
    }
}
Thread thread = new ByThread();
thread.start();
thread.join();

1.4. 线程状态

使用Thread.currentThread().getState();可以获得当前线程状态,thread.getState();获得某一线程的状态。

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED):表示该线程已经执行完毕。

1.5. 线程超时等待(TIMED_WAITING)相关方法

  1. 线程睡眠(sleep),当持有锁时不释放锁!且sleep的线程无法从外部唤醒,但可被中断!

    try {
        //睡眠1000毫秒,在此期间不参与CPU资源竞争(线程调度)
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        //可能在睡眠结束前线程就被中断了
        e.printStackTrace();
    }
    
  2. 线程让出(yield)

    //给CPU一个提示,让出本次线程调度,但参与下一次调度(进入线程等待队列),CPU可能会忽略本次让出
    Thread.yield();
    
  3. 线程加入(join)

    Thread t = new Thread(()->{
        System.out.println("new Thread.....");
    });
    t.start();
    //将线程t加入当前线程,即告诉当前线程等待线程t结束或在1000毫秒后再继续
    t.join(1000);
    
  4. 线程超时等待(wait),只有持有锁的对象才能调用这个方法且会释放锁!

    //使获得锁的obj对象所在线程进入等待,等待1000毫秒后自动被唤醒,或者被notify(notifyAll)唤醒
    obj.wait(1000);
    
  5. 线程超时禁用(park)

    //等待到当前时间的1000毫秒结束禁用
    LockSupport.parkUntil(System.currentTimeMillis()+1000);
    //禁用当前线程10000纳秒
    LockSupport.parkNanos(10000);
    

1.6. 线程等待(WAITING)相关方法

  1. wait、notify、notifyAll:wait使当前获得锁的线程等待并释放锁和notify唤醒随机一个等待的线程来竞争锁及notifyAll唤醒所有等待的线程来竞争锁(只有持有锁的对象才能调用这些方法,且notify和notifyAll只能唤醒在同一个锁对象下wait的线程)

    //模拟仓库
    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    //模拟生产者
    Thread producer = new Thread(()->{
        for(int i=0;;++i){
            try {
                while (queue.size() < 5) {
                    Thread.sleep(100);
                    queue.add("prod"+(++i));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (queue) {
                //随机唤醒一个等待中的线程
                queue.notify();
                if(queue.size() >= 5) {
                    try {
                        //仓库满了使当前线程等待
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    });
    //模拟消费者
    Thread consumer = new Thread(()->{
        for(;;){
            try {
                while (queue.size() > 0 ) {
                    Thread.sleep(10);
                    System.out.println(queue.poll());
                }
            } catch (InterruptedException e){
                e.printStackTrace();
            }
            synchronized (queue) {
                //唤醒所有其他线程,此时最多只有一个线程(生产者)在等待
                queue.notifyAll();
                if(queue.size()<=0) {
                    try {
                        //商品消费完了,使当前线程等待
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    });
    producer.start();
    consumer.start();
    producer.join();
    consumer.join();
    
  2. park和unpark:park使线程等待及unpark使线程结束等待,park时当持有锁时不释放锁!可以在外部被unpark唤醒!

    Thread t = new Thread(()->{
        System.out.println("new.....");
        LockSupport.park();//让线程进入等待
        System.out.println("over wait");
    });
    t.start();
    Thread.sleep(1000);
    System.out.println("main sleep 1000");
    LockSupport.unpark(t);//唤醒指定线程t
    /*控制台输出:
    new.....
    main sleep 1000
    over wait
    */
    

2. synchronized关键字

  1. synchronized是同步、互斥、可重入的锁
  2. synchronized锁住的是对象,当没有传入对象时:
    1. 当对静态方法加锁时锁住的是当前类的class对象。
    2. 当对实例方法枷锁时锁住的时当前实例对象(this)。
    3. 在方法内使用synchronized时必须传入对象synchronized(obj){/*sync code*/}并锁住其代码块。
  3. synchronized锁住的对象才可以使用wait、notify、notifyAll。
  4. synchronized存在锁升级的概念
    1. 当始终同时只有同一个线程竞争时,锁时处于偏向锁状态(markword只记录线程标识且不主动释放,默认不会有其他线程竞争锁)。
    2. 在获得锁的线程未结束时有线程来竞争锁,锁升级为轻量级锁(自旋锁,线程不进入等待)。
    3. 当竞争变大(10次)或者锁持续时间变长时,锁升级为重量级锁(调用系统函数使线程进入等待)。

3. volatile关键字

4. Atomic原子操作类及VarHandle

5. 各种JUC包下的同步锁

5.1. ReentrantLock

package test;


import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadTest {
    private static final String thStr(){
        return Thread.currentThread().getId()+" "+Thread.currentThread().getName();
    }

    public static class ReentrantLockTest {
        private final static Lock lock1 = new ReentrantLock();//入参为true为公平锁,等待锁最久将获得锁
        private final static Lock lock2 = new ReentrantLock();
        private static final void doSomething() {
            String str = thStr();
            try {
                System.out.println(str);
                lock1.lock();
                System.out.println(str+" alloc lock");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println(str+" will free lock");
                lock1.unlock();
            }
        }
        //常规获得锁
        public final static void testLock() {
            for (int i=0;i<10;i++){
                new Thread(()->{
                    doSomething();
                }).start();
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //死锁
        public static final void deadLock() {
            //两个线程相互获取对方锁
            var t1 = new Thread(new DeadLockThread(lock1,lock2));
            var t2 = new Thread(new DeadLockThread(lock2,lock1));
            t1.start();
            t2.start();
            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t1.interrupt();//中断线程1,使线程2获得锁
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        private static final class DeadLockThread implements Runnable {
            private Lock lock1,lock2;
            public DeadLockThread(Lock lock1, Lock lock2) {
                this.lock1 = lock1;
                this.lock2 = lock2;
            }
            @Override
            public void run() {
                String str = thStr();
                try {
                    lock1.lockInterruptibly();
                    System.out.println(str+" acquire lock1");
                    TimeUnit.MILLISECONDS.sleep(100);
                    lock2.lockInterruptibly();
                    System.out.println(str+" acquire lock2");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(str+" will free lock1");
                    lock1.unlock();
                    System.out.println(str+" will free lock2");
                    lock2.unlock();
                }
            }
        }
        //超时休眠重新尝试获取锁
        public static final void tryLockTest(){
            ArrayList<Thread> threads = new ArrayList<>(8);
            for(int i=0;i<4;i++){
                threads.add(new Thread(()->{
                    tryLock();
                }));
            }
            threads.forEach(thread -> thread.start());
            threads.forEach(t-> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        private static final void tryLock(){
            String str = thStr();
            try {
                while (!lock1.tryLock(1,TimeUnit.SECONDS)){
                    System.out.println(str + " try acquire lock1 filed");
                    Thread.sleep(300);
                }
                System.out.println(str + " acquire lock1");
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println(str+" attempt free lock1");
                lock1.unlock();
            }

        }
        //Condition等待和唤醒
        public static final void testCondition(){
            ArrayList<Thread> threads = new ArrayList<>();
            for (int i=0;i<20;i++) {
                if(i%2==0){
                    threads.add(new Thread(()->ConditionTest.get()));
                }else{
                    threads.add(new Thread(()->ConditionTest.put()));
                }
            }
            threads.forEach(t->t.start());
        }
        private static final class ConditionTest{
            private static final Condition condP = lock1.newCondition();
            private static final Condition condG = lock1.newCondition();
            private static final ArrayList<String> list = new ArrayList<>();
            private static final AtomicInteger ai = new AtomicInteger(0);
            public static final void put(){
                String str = thStr();
                try {
                    lock1.lock();
                    System.out.println(str+" try put");
                    if(list.size()>=3){
                        System.out.println(str+" list size >=3 will await");
                        condP.await(300,TimeUnit.MILLISECONDS);//线程进入等待,同时释放锁
                    }
                    list.add("item "+ai.incrementAndGet());
                    if(list.size()>=1){
                        System.out.println(str+" now list size "+list.size());
                        condG.signalAll();//唤醒等待的锁
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(str+" unlock put");
                    lock1.unlock();
                }
            }
            public static final void get(){
                String str = thStr();
                try {
                    lock1.lock();
                    System.out.println(str+" try get");
                    if(list.size()<=0){
                        System.out.println(str+" list size <=0 will await");
                        condG.await();
                    }
                    try {
                        System.out.println(str+" "+list.remove(0));
                    } catch (RuntimeException e){
                        System.err.println(str+" list size is zero, will retry");
                        get();
                    }
                    if(list.size()<3)
                        condP.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(str+" unlock get");
                    lock1.unlock();
                }

            }
        }
    }



}

5.2. ReadWriteLock - StampedLock

5.3. CountDownLatch

5.4. CyclicBarrier

5.5. Phaser

5.6. Semaphore

5.7. Exchanger

5.8. LockSupport

5.9. AbstractQueuedSynchronizer

VarHandle

java.lang.invoke.VarHandle:可以对对象普通属性进行原子操作,比反射快

public class XXX{
    long x = 100L;
    private static VarHandle varHandle;
    static {
        try {
            varHandle = MethodHandles.lookup().findVarHandle(XXX.class,"x",long.class);
        } catch (Exception e) { e.printStackTrace(); }
    }
    public static void main(String[] args) throws Exception {
        var me = new XXX();
        System.out.println("x="+varHandle.get(me));//100
        varHandle.set(me,1000l);
        System.out.println("x="+me.x);//1000
        varHandle.compareAndSet(me,100l,10000l);//CAS
        System.out.println("x="+me.x);//1000
    }
}

5.10. 阻塞队列

SynchronousQueue

//容量为0的阻塞队列,用于线程间交换数据,直接将数据送往另一个线程
public static SynchronousQueue synchronousQueue = new SynchronousQueue();
public static void main(String[] args) throws Exception{
    new Thread(()->{
        try {
            //取出
            System.out.println(synchronousQueue.take());
            System.out.println(synchronousQueue.take());
            System.out.println("OVER2");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
    //阻塞等待索取
    synchronousQueue.put("synchronousQueue2");
    synchronousQueue.put("synchronousQueue1");
    System.out.println("OVER1");
}

TransferQueue

//提供独特的transfer方法,阻塞当前线程,直到被transfer放入的数据被取出
public static TransferQueue transferQueue = new LinkedTransferQueue();
public static void main(String[] args) throws Exception{
    for (int i = 0; i < 2; i++) {
        new Thread(()->{
            try {
                //取出数据
                System.out.println(transferQueue.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    //阻塞到被取出
    transferQueue.transfer("transferQueue1");
    System.out.println("transferQueue1over");
    transferQueue.transfer("transferQueue2");
    System.out.println("transferQueue2over");
}

DelayQueue

static class MyDelayed implements Delayed{
    String msg;
    long time;
    public MyDelayed(String msg, long time) {
        this.msg = msg;
        this.time = time+System.currentTimeMillis();
    }
    @Override //到达执行时间的剩余时间
    public long getDelay(TimeUnit unit) {
        return unit.convert(time-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }
    @Override //确定优先级
    public int compareTo(Delayed o) {
        return (int)(getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
    }
}
public static DelayQueue<MyDelayed> delayQueue = new DelayQueue();
public static void main(String[] args) throws Exception{
    delayQueue.add(new MyDelayed("asd3",2000l));
    delayQueue.add(new MyDelayed("asd1",1000l));
    delayQueue.add(new MyDelayed("asd2",1500l));
    MyDelayed myDelayed;
    while ((myDelayed=delayQueue.take())!=null)
        System.out.println(myDelayed.msg+" ,current: "+System.currentTimeMillis());
}

6. 线程池

ThreadPoolExecutor

//完整构造线程池
public ThreadPoolExecutor(int corePoolSize, //核心线程数
                          int maximumPoolSize, //最大线程数
                          long keepAliveTime, //超过核心线程数的线程的最大空闲生存时间
                          TimeUnit unit, //keepAliveTime的单位
                          BlockingQueue<Runnable> workQueue, //线程队列,当线程数超过最大线程时入队
                          ThreadFactory threadFactory, //线程工厂
                          RejectedExecutionHandler handler) //当线程数满,队列满时的拒绝策略
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
03-08 19:16