1. 多线程案例
1.1 单例模式
单例模式(Singleton Pattern)是一种常用的软件设计模式,该模式的,并提供一个全局访问点。在很多情况下,单例模式是非常有用的,例如日志记录、驱动对象、缓存等。
单例模式具体的实现方式, 分成 “饿汉” 和 “懒汉” 两种
饿汉模式
在这种方式下,实例在类加载时就已经创建,因此,没有任何多线程的问题。这是最简单的一种实现方式,但在类加载的时候就创建实例可能会增加系统的开销
// 恶汉模式的 单例模式实现
// 此处保证 Singleton 这个类只能创建出一个单例
class Singleton {
// 先把这个实例创建出来
private static Singleton instance = new Singleton();
// 如果需要创建这个实例, 统一使用 Singleton.getInstance() 来获取
public static Singleton getInstance() {
return instance;
}
// 为了避免 Singleton 这个类被创建出多份
// 此处把构造方法设置为 private , 此时在类外面就无法通过 new 的方式来创建 Singleton 这个实例了
private Singleton() {};
}
public class ThreadDemo1 {
public static void main(String[] args) {
Singleton s1 = Singleton.getInstance();
Singleton s2 = Singleton.getInstance();
System.out.println(s1 == s2); // 同一个对象
}
}
懒汉模式 - 单线程版
类加载的时候不创建实例, 类加载的时候才创建实例.
class Singleton2 {
private static Singleton2 instance = null;
private Singleton2() {}
public static Singleton2 getInstance() {
if (instance == null) {
instance = new Singleton2();
}
return instance;
}
}
加上synchronized可以解决这里的线程安全问题
class Singleton3 {
private static Singleton3 instance = null;
private Singleton3 () {}
public synchronized static Singleton3 getInstance() {
if (instance == null) {
instance = new Singleton3();
}
return instance;
}
}
在这里我们还可以在改进
- 使用双重 if 判定, 降低锁竞争的频率
- 给 instance 加上 volatile
class Singleton4 {
private volatile static Singleton4 instance = null;
private Singleton4 () {}
public static Singleton4 getInstance() {
if (instance == null) {
synchronized (Singleton4.class) {
if (instance == null) {
instance = new Singleton4();
}
}
}
return instance;
}
}
1.2 阻塞式队列
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等
待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
标准库中的阻塞队列
在Java标准库中内置了阻塞队列, 如果我们需要在程序中使用阻塞队列, 直接使用标准库中即可
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 方法用于阻塞式的出队列
- BlockingQueue 也有offer , poll, peek 等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
生产者消费者模型是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见,它由两类线程和一个缓冲区组成,这两类线程分别是生产者线程和消费者线程,缓冲区存放生产者的数据,生产者往缓冲区中添加数据,消费者从缓冲区中取走数据,当缓冲区满时,生产者阻塞,当缓冲区空时,消费者阻塞。
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadDemo5 {
public static void main(String[] args) throws InterruptedException {
//创建一个阻塞队列
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
// 模拟消费者
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素" + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"消费者");
// 启动线程
customer.start();
// 模拟生产者
Thread product = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素" + num);
blockingQueue.put(num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"消费者");
//启动线程
product.start();
customer.join();
product.join();
}
}
阻塞队列实现
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private int head = 0;
private int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) / items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) / items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素" + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"消费者");
customer.start();
Thread producer = new Thread(() -> {
while (true) {
try {
Random random = new Random();
int ret = random.nextInt(10000);
blockingQueue.put(ret);
System.out.println("生产元素" + ret);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"生产者");
producer.start();
customer.join();
producer.join();
}
}
2. 定时器
定时器是编程语言中用于在指定的时间或时间间隔内执行特定任务的工具
标准库中的定时器
- 标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule
- schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后
执行 (单位为毫秒)
这里我们简单举个例子
import java.util.Timer;
import java.util.TimerTask;
public class ThreadDemo6 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 1");
}
},3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 3");
}
},1000);
}
}
自己实现定时器
import java.util.TimerTask;
import java.util.concurrent.PriorityBlockingQueue;
// 核心
// 1. 有一个扫描线程, 负责判定时间到, 执行任务
// 2. 还需要一个数据结构, 来保存所有的被注册的任务(优先级队列)
// 使用这个类来表示一个定时器中的任务
class MyTask implements Comparable<MyTask> {
// 要执行的任务
private Runnable runnable;
// 任务在啥时候执行(使用一个毫秒级时间戳来表示)
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
// 获取时间
public long getTime() {
return time;
}
// 执行任务
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
// 返回小于 0 , 大于0 , 0
// this 比 o 小, 返回 < 0
// this 比 o 打, 返回 > 0
// this 和 o 相同, 返回 0
return (int)(this.time - o.time);
}
}
// 自己实现定时器
class MyTimer {
// 扫描线程
private Thread t = null;
//用一个优先级阻塞队列来保存任务
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() -> {
while (true) {
// 取出队首元素, 检查队首元素是否任务时间到了
// 如果时间没到, 就把任务塞回去
// 如果时间到了, 就开始执行
try {
synchronized (this) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if (curTime < myTask.getTime()) {
// 还没到点, 不执行
queue.put(myTask);
// 在 put 之后, 进行wait
this.wait(myTask.getTime() - curTime);
} else {
// 时间到了, 开始执行
myTask.run();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
// 指定两个参数
// 第一个参数是 任务内容
// 第二个参数是 任务在多少毫秒后开始执行
public void schedule(Runnable runnable, long after) {
MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
queue.put(myTask);
synchronized (this) {
this.notify();
}
}
}
public class ThreadDemo7 {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
Runnable runnable1 =new Runnable() {
@Override
public void run() {
System.out.println("hello 1");
}
};
Runnable runnable2 =new Runnable() {
@Override
public void run() {
System.out.println("hello 2");
}
};
Runnable runnable3 =new Runnable() {
@Override
public void run() {
System.out.println("hello 3");
}
};
Runnable runnable4 =new Runnable() {
@Override
public void run() {
System.out.println("hello 4");
}
};
myTimer.schedule(runnable1,4000);
myTimer.schedule(runnable2,3000);
myTimer.schedule(runnable3,2000);
myTimer.schedule(runnable4,1000);
}
}
3. 线程池
线程池是什么
- 线程池是将多个线程预先存储在一个"池子"内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从"池子"内取出相应的线程执行对应的任务即可。
- 线程池中的线程都为后台线程,每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲,则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程,但线程的数目永远不会超过最大值,超过最大值的线程可以排队,但他们要等到其他线程完成后才启动
标准库中的线程池
- 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
Executors.newFixedThreadPool(10)
- 返回值类型为 ExecutorService
ExecutorService pool = Executors.newFixedThreadPool(10);
- 通过 ExecutorService.submit 可以注册一个任务到线程池中.
pool.submit(new Runnable() {
public void run() {
System.out.println("hello");
}
});
下面举一个简单的例子
创建一个 10 个线程的线程池, 打印 1000 次 “hello”
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 使用标准库的线程池
public class ThreadDemo8 {
public static void main(String[] args) {
// 创建了一个线程池, 池子里有10个线程
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
public void run() {
System.out.println("hello" + n);
}
});
}
}
}
Executors 创建线程池的几种方式
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数目动态增长的线程池.
- newSingleThreadExecutor: 创建只包含单个线程的线程池.
- newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer
那么我们在开发中, 线程池的线程数设置为多少合适呢?
不同的程序特点不同, 此时要设置的线程数也是不同的,
我们考虑两个极端情况,:
- CPU密集型 , 每个线程要执行的任务都是"狂转"CPU(进行一系列的算术运算)
- IO密集型, 每个线程干的工作就是等待IO(读写硬盘, 读写网卡, 等待用户输入…),不吃CPU
然而, 在我们实际开发中, 并没有程序符合这两种理想情况, 真实的程序, 往往是一部分要吃CPU, 一部分等待IO.
具体这个程序, 几成工作量吃CPU, 几成工作量等待IO, 不确定
自己实现一个线程池(固定数量线程的线程池)
// 一个线程池, 里面至少包含两个大的部分
// 1. 阻塞队列, 用来保存任务
// 2. 若干个工作线程
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class MyThreadPool {
// 此处不涉及"时间", 只有任务, 就直接使用Runnable
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// n 表示线程的数量
public MyThreadPool(int n) {
// 在这里创建出线程
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
while (true) {
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
// 注册任务给线程池
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public class ThreadDemo9 {
public static void main(String[] args) {
// 创建我自己的线程池
MyThreadPool pool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello" + n);
}
});
}
}
}