一、并发设计模式
什么是设计模式
单例模式
不变模式
Future模式
生产者消费者
什么是设计模式
在软件工程中,设计模式(design pattern)是对软件设计中普遍存在(反复出现)的各种问题 ,所提出的解决方案。这个术语是由埃里希·伽玛(Erich Gamma)等人在1990年代从建筑设计领 域引入到计算机科学的。 Richard Helm, Ralph Johnson ,John Vlissides (Gof) 《设计模式:可复用面向对象软件的基础》 收录 23种模式 – 观察者模式 – 策略模式 – 装饰者模式 – 享元模式 – 模板方法 架构模式 – MVC – 分层 设计模式 – 提炼系统中的组件 代码模式(成例 Idiom) – 低层次,与编码直接相关 – 如DCL
单例模式
单例对象的类必须保证只有一个实例存在。许多时候整个系统只需要拥有一个的全局对象,这样 有利于我们协调系统整体的行为 比如:全局信息配置
/** * description: 饿汉式 线程安全 * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 7:32 PM */ public class Singleton { public static int STATUS = 1; private Singleton() { System.out.println("Singleton is create"); } private static Singleton instance = new Singleton(); public static Singleton getInstance() { return instance; } public static void main(String[] args) { //何时产生实例 不好控制 System.out.println(Singleton.STATUS); //输出 // Singleton is create //1 } }
/** * description: 懒汉式 线程安全 * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 7:37 PM */ public class LazySingleton { //1. private LazySingleton() { System.out.println("LazySingleton is create"); } private static LazySingleton instance = null; public static synchronized LazySingleton getInstance() { if (instance == null) instance = new LazySingleton(); return instance; } // 2. 多线程 性能好 // 双检锁/双重校验锁(DCL,即 double-checked locking)线程安全 // private volatile static LazySingleton instance; // private LazySingleton (){ // System.out.println("LazySingleton is create"); // } // public static LazySingleton getInstance() { // if (instance == null) { // synchronized (LazySingleton.class) { // if (instance == null) { // instance = new LazySingleton(); // } // } // } // return instance; // } public static void main(String[] args) { LazySingleton.getInstance(); } }
/** * description: 静态内部类 线程安全 * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 7:39 PM */ public class StaticSingleton { private StaticSingleton() { System.out.println("StaticSingleton is create"); } private static class SingletonHolder { private static final StaticSingleton instance = new StaticSingleton(); } public static final StaticSingleton getInstance() { return SingletonHolder.instance; } public static void main(String[] args) { StaticSingleton.getInstance(); } }
不变模式
一个类的内部状态创建后,在整个生命期间都不会发生变化时,就是不变类 不变模式不需要同步
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 7:55 PM */ //确保无子类 public final class Product { private final String no; //私有属性,不会被其他对象获取 private final String name; //final保证属性不会被2次赋值 private final double price; public Product(String no, String name, double price) { super(); //因为创建之后,无法进行修改 this.no = no; this.name = name; this.price = price; } public String getNo() { return no; } public String getName() { return name; } public double getPrice() { //在创建对象时,必须指定数据 return price; } }
java.lang.String java.lang.Boolean java.lang.Byte java.lang.Character java.lang.Double java.lang.Float java.lang.Integer java.lang.Long java.lang.Short
Future模式
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 8:07 PM */ public class RealData implements Data { protected final String result; public RealData(String para) { //RealData的构造可能很慢,需要用户等待很久,这里使用sleep模拟 StringBuffer sb = new StringBuffer(); for (int i = 0; i < 10; i++) { sb.append(para); try { //这里使用sleep,代替一个很慢的操作过程 Thread.sleep(100); } catch (InterruptedException e) { } } result = sb.toString(); } public String getResult() { return result; } }
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 8:05 PM */ public class FutureData implements Data { //FutureData是RealData的包装 protected RealData realdata = null; protected boolean isReady = false; public synchronized void setRealData(RealData realdata) { if (isReady) { return; } this.realdata = realdata; isReady = true; //RealData已经被注入,通知getResult() notifyAll(); } //会等待RealData构造完成 public synchronized String getResult() { while (!isReady) { try { //一直等待,知道RealData被注入 wait(); } catch (InterruptedException e) { } } //由RealData实现 return realdata.result; } }
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 8:07 PM */ public class Client { public Data request(final String queryStr) { final FutureData future = new FutureData(); new Thread() { // RealData的构建很慢,所以在单独的线程中进行 public void run() { RealData realdata = new RealData(queryStr); future.setRealData(realdata); } }.start(); // FutureData会被立即返回 return future; } public static void main(String[] args) { Client client = new Client(); //这里会立即返回,因为得到的是FutureData而不是RealData Data data = client.request("name"); System.out.println("请求完毕"); try { //这里可以用一个sleep代替了对其他业务逻辑的处理 //在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间 Thread.sleep(2000); } catch (InterruptedException e) { } //使用真实的数据 System.out.println("数据 = " + data.getResult()); } }
Callable 接口
import java.util.concurrent.Callable; /** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 8:07 PM */ public class RealData implements Callable<String> { protected final String para; public RealData(String para) { this.para = para; } @Override public String call() throws Exception { //RealData的构造可能很慢,需要用户等待很久,这里使用sleep模拟 StringBuffer sb = new StringBuffer(); for (int i = 0; i < 10; i++) { sb.append(para); try { //这里使用sleep,代替一个很慢的操作过程 Thread.sleep(100); } catch (InterruptedException e) { } } return sb.toString(); } }
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; /** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 8:27 PM */ public class FutureMain { public static void main(String[] args) throws InterruptedException, ExecutionException { //1. // //构造FutureTask // FutureTask<String> future = new FutureTask<String>(new RealData("a")); // ExecutorService executor = Executors.newFixedThreadPool(1); // //执行FutureTask,相当于上例中的 client.request("a") 发送请求 // // 在这里开启线程进行RealData的call()执行 // executor.submit(future); // executor.shutdown(); // System.out.println("请求完毕"); // try { // //这里依然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理 // Thread.sleep(2000); // } catch (InterruptedException e) { // } // //相当于data.getResult (),取得call()方法的返回值 // // 如果此时call()方法没有执行完成,则依然会等待 // System.out.println("数据 = " + future.get()); //2. ExecutorService executor = Executors.newFixedThreadPool(1); //执行FutureTask,相当于上例中的 client.request("a") 发送请求 // 在这里开启线程进行RealData的call()执行 Future<String> future = executor.submit(new RealData("a")); System.out.println("请求完毕"); try { //这里依然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理 Thread.sleep(2000); } catch (InterruptedException e) { } //相当于data.getResult (),取得call()方法的返回值 // 如果此时call()方法没有执行完成,则依然会等待 System.out.println("数据 = " + future.get()); } }
生产者消费者模式
生产者-消费者模式是一个经典的多线程设计模式。它为多线程间的协作提供了良好的解决方案。 在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。生产者线 程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间则通 过共享内存缓冲区进行通信。
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 10:32 PM */ public final class PCData { private final int intData; public PCData(int intData) { this.intData = intData; } public PCData(String d) { this.intData = Integer.valueOf(d); } public int getIntData() { return intData; } @Override public String toString() { return "data:" + intData; } }
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/3 10:05 PM */ public class Producer implements Runnable{ private volatile boolean isRunning = true; private BlockingQueue<PCData> queue; private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data = null; Random r = new Random(); System.out.println("start producer id=" + Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(r.nextInt(SLEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data + " is put into queue"); if (!queue.offer(data,2, TimeUnit.SECONDS)) { System.out.println("failed to put data: " + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop() { isRunning = false; } }
import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; /** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/3 10:05 PM */ public class Consumer implements Runnable{ private BlockingQueue<PCData> queue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { System.out.println("start Consumer id=" + Thread.currentThread().getId()); Random r = new Random(); try { while (true) { PCData data = queue.take(); if (null != data) { int re = data.getIntData() * data.getIntData(); System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re)); Thread.sleep(r.nextInt(SLEEPTIME)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
/** * description: * * @author: dawn.he QQ: 905845006 * @email: [email protected] * @email: [email protected] * @date: 2019/10/4 10:19 PM */ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class Storage { // 仓库存储的载体 private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10); public void produce() { try{ list.put(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); } catch (InterruptedException e){ e.printStackTrace(); } } public void consume() { try{ list.take(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + list.size()); } catch (InterruptedException e){ e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { // Storage storage = new Storage(); // storage.produce(); // storage.consume(); BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); service.shutdown(); } }
从BlockingQueue到无锁Disruptor的性能提升:https://blog.csdn.net/weixin_33704591/article/details/92407717