定时器

定时器也是软件开发中的一个重要组件,我们可以将一个任务交给定时器,约定好时间到了定时器就执行该任务。

比如,当客户端发出请求后,会等待服务器的响应,但是由于网络环境的复杂性,如果吃吃没有得到响应,也不会一直等待下去(不现实),此时就会设定一个最大等待时间,这里的这个最大等待时间,就可以使用定时器的方式来实现,当时间到达最大等待时间后,就放弃等待。

定时器的工作原理

在标准库中提供了定时器Timer类,Timer 类的核心方法为 schedule(),
schedule() 包含两个参数,第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒)。

定时器会将传入的任务和与其相对应的等待时间存放到带有阻塞的优先级队列(线程安全)中,每次只需要维护时间最短的任务,时间没到时就可以进入阻塞,当时间间隔最短的任务到了执行时间,就对其进行出队操作,并执行任务里面的内容。

定时器的使用

  public static void main(String[] args) {
        System.out.println("程序启动");
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("执行定时器任务1");
            }
        },3000);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("执行定时器任务2");
            }
        },2000);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("执行定时器任务3");
            }
        },1000);
    }

执行结果~~

程序启动
执行定时器任务3
执行定时器任务2
执行定时器任务1

我们可以看到,定时器按照我们指定的任务和时间进行工作了,但是任务执行完毕,线程并不会结束,而是会进入阻塞,等待下一个任务的到来。

实现定时器

定时器是通过优先级队列来维护待执行的任务及其对应的时间,并且我们要实现compareTo接口。


class Task implements Comparable<MyTask>{
    private Runnable runnable;// 任务 - 交给线程执行
    private long time;// 等待时间
 
    public Task(Runnable runnable, long time) {
        this.runnable = runnable;
        this.time = time;
    }
    //获取任务的时间
    public long getTime() {
        return time;
    }
    //执行任务
    public void run() {
        runnable.run();
    }
 
    @Override
    public int compareTo(Task o) { // 重写compare接口
        return (int)(this.time - o.time);
    }
}

定时器中有一个扫描线程扫描优先级队列中(优先级最高的)的任务是否到了执行时间。

不断将队(优先级最高的)首元素取出,判断是否到达时间,然候根据情况选择是将任务放回还是执行,这个需要一个循环去不停的执行。

但是如果任务的执行时间和当前时间相差很多,不断的取出、判断、放回 会额外占用很多cpu资源(线程忙等),所以我们对其考虑进行一个睡眠操作,睡眠的时间就是 – 任务要执行的时间减去当前的时间,但是,如果有新的任务传了进来,并且时间比当前的最短时间要短就可能出现新任务没有被执行的情况,所以,上面的睡眠操作是不可取的,而是应该使用wait,在wait中设置最大的等待时间,当有新的任务传进来时将wait唤醒,然后重新判断,具体实现如下:

class MyTimer {
    //扫描线程
 private Thread t = null;

 private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
 
 public MyTimer() {
     t = new Thread(() -> {
         while(true) {
             try {
                 synchronized(this) {
                     Task myTask = queue.take();
                     long curTime = System.currentTimeMillis();
                     if(curTime < myTask.getTime()) {
                         queue.put(myTask);
                         this.wait(myTask.getTime() - curTime);
                     } else {
                         myTask.run();
                     }
                 }
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     });
     t.start();
 }
 
    public void schedule(Runnable runnable,long after) {
        //时间戳需要进行换算
        Task myTask = new Task(runnable,System.currentTimeMillis() + after);
        queue.put(myTask);
        synchronized(this) {
            this.notify();
        }
    }
}

一定要注意加锁的范围,锁一定要包括扫描线程while里面的全部内容,因为如果扫描线程计算完需要等待的时间之后wait之前,扫描线程被切走,此时有一个新任务传了进来,执行了notify之后扫描线程才开始进行工作,那么扫描线程就没有扫描到新的任务,如果新的任务的时间更短,那么新的任务就没有被执行。

完整代码


class Task implements Comparable<Task>{
    private Runnable runnable;
 
    private long time;
 
    public Task(Runnable runnable, long time) {
        this.runnable = runnable;
        this.time = time;
    }
    //获取任务的时间
    public long getTime() {
        return time;
    }
    //执行任务
    public void run() {
        runnable.run();
    }
 
    @Override
    public int compareTo(Task o) {
        return (int)(this.time - o.time);
    }
}
 
class MyTimer {
    //扫描线程
    private Thread t = null;
 
    private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
 
    public MyTimer() {
        t = new Thread(() -> {
            while(true) {
                try {
                    synchronized(this) {
                        Task myTask = queue.take();
                        long curTime = System.currentTimeMillis();
                        if(curTime < myTask.getTime()) {
                            queue.put(myTask);
                            this.wait(myTask.getTime() - curTime);
                        } else {
                            myTask.run();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t.start();
    }
 
    public void schedule(Runnable runnable,long after) {
        //时间戳需要进行换算
        Task myTask = new Task(runnable,System.currentTimeMillis() + after);
        queue.put(myTask);
        synchronized(this) {
            this.notify();
        }
    }
}

线程池

线程池存的优点

线程是系统调度的最小单位,由于进程的开销太大,就引入了线程,在java中更喜欢使用多线程。除了线程还有一种“协程”也可以进一步提高性能,但是有于标准库并没有很好的支持,在java中使用的并不算多。

但由于对性能的进一步追求,此时就引入了线程池的技术,池化技术是一种非常重要的思想,使用非常广泛,比如,线程池、内存池、常量池、连接池等。

我们可以提前(一次性)创建一些线程,加入线程池中,在需要使用线程时,直接去线程池中取,不用时也不着急释放。这样由于是一次性创建(内核态+ 用户态),之后从线程池中拿取(户态操作),就会提降低程序的开销。此时申请和释放的”度“可以由程序员去自己设计,而不用全部听从系统内核的调度,使得程序更加的可控。

线程池的使用

常见的线程池创建有四种:

//可以设置线程数量
ExecutorService pool = Executors.newFixedThreadPool(10);
//根据任务的数量 去动态变化线程的数量
ExecutorService pool = Executors.newCachedThreadPool();
//只有一个线程
ExecutorService pool = Executors.newSingleThreadExecutor();
//类似定时器,让任务延时进行
ExecutorService pool = Executors.newScheduledThreadPool(10);

使用的方法很简单,就是调用里面的submit方法,在里面传一个任务就可以了,如下:创建1000个任务让线程池执行

public static void main(String[] args) {
    //	设置线程数量
    ExecutorService pool = Executors.newFixedThreadPool(10);
    for(int i = 0;i < 1000;i++) {
        int n = i;
        pool.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello" + n);
            }
        });
    }
}

线程池的原理

工厂模式

    线程池的创建时没有使用new的,而是调用了一个方法,而真正的new操作在方法里面进行,这样的设计模式叫做工厂模式。

    工厂模式作用是什么呢?在java中,重载的规则是在 方法名(可以省略,因为重载主要就是需要方法名相同)、参数个数、参数类型 其中至少有一项不同,否则就无法达成重载。
    那么如果有两种构造方法,他们想要构成重载,但是又达不到重载的条件,此时,就可以使用工厂模式,去根据不同的需求去构造。

    举个例子,假如有一个类,它的用途是构造出一个坐标系上的点,构造这样的点可以传入x、y坐标,也可以传入距原点的半径长和角度大小r、a(极坐标),这四个参数都需要double类型,数量相同,方法名也相同,无法达成重载,此时就可以使用工厂模式去创建

Point point1 = newXYPoint(1,1);
Point point2 = newRAPoint(1,1);

ThreadPoolExecutor类

线程池的四种构造方法都是对ThreadPoolExecutor的封装,我们只需要了解ThreaPoolExecutoe类即可。

ThreadPoolExecutor类有7个参数。

  1. int corePoolSize

    • 核心线程数,也就是线程池中固定的线程数量
  2. int maximumPoolSize

    • 最大线程数,是线程池中可以包含的最大线程数量

最大线程数和核心线程数的差值属于临时线程,也就是可以允许被回收掉的线程,比如当前任务量很多,那么就可以多创建几个临时线程去执行任务,当任务量比较少的时候,这些临时线程没有事情干就可以被回收掉。

  1. long keepAliveTime 和 TimeUnit unit(时间单位:s、ms、分钟…)

    • 这两个参数描述了临时线程可以最长的“闲置”时间,如果临时线程在一定的时间内没有工作,那么此时就会被回收掉。
  2. BlockingQueue workQueue

    • 线程池的任务队列,用一个阻塞队列来 接收、取出 任务。
  3. ThreadFactory threadFactory

    • 线程池的工厂方法,用于创建线程。
  4. RejectedExecutionHandler handler

    • 线程池的拒绝策略。

    标准库中提供了四种线程池的拒绝策略,如下:
定时器与线程池-LMLPHP

  1. AbortPolicy - 队列满了,触发异常(摆烂了 - 罢工!!!)
  2. CallerRunsPolicy - 队列满了,就将该任务交还给加入线程执行(谁给我的谁执行)
  3. DiscardOldestPolicy - 队列满了,丢弃最早的任务,将该任务加入队列
  4. DiscardPolicy - 队列满了,丢弃最新任务,将该任务加入队列。

实现线程池

    简易线程池的实现:

class MyPool {
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    public MyPool(int n) {
        for(int i = 0;i < n;i++) {
            Thread t = new Thread(() -> {
                while(true) {
                    try {
                        Runnable runnable = queue.take();
                        runnable.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
    public void submit(Runnable runnable) {
        try {
            queue.put(runnable);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class PoolDemo2 {
    public static void main(String[] args) {
        MyPool myPool = new MyPool(10);
        for(int i = 0;i< 1000;i++) {
            int n = i;
            myPool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("hello " + n);
                }
            });
        }
    }
}
11-28 09:36