Semaphore(计数信号量):准许n个任务同时访问这个资源,正常的锁(locks或synchronized)在任何时候都是只允许一个任务访问一项资源。
信号的使用场景有:对象池,任务限流等。
下面有两个简单模拟:
1.任务限流:
public class TestSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semp = new Semaphore(5); for (int i = 0; i < 20; i++) { final int NO = i; Runnable runnable = new Runnable() { @Override public void run() { try { //获取许可 semp.acquire(); System.out.println("获取任务许可 :" +NO); Thread.sleep((long)(Math.random()*10000)); //访问完后,释放 semp.release(); System.out.println("当前可用的信号量:" + semp.availablePermits()); } catch (Exception e) { e.printStackTrace(); } } }; exec.execute(runnable); } //退出线程池 exec.shutdown(); } }
2.对象池
对象池创建
public class Pool<T> { private int size; private List<T> items = new ArrayList<>(); private volatile boolean[] checkedOut; private Semaphore available; public Pool(Class<T> classObject, int size) { this.size = size; checkedOut = new boolean[size]; available = new Semaphore(size, true); for (int i = 0; i < size; i++) { try { items.add(classObject.newInstance()); } catch (Exception e) { e.printStackTrace(); } } } public T checkOut() throws InterruptedException { available.acquire(); return getItem(); } public void checkIn(T x) { if (releaseItem(x)) { available.release(); } } private synchronized T getItem() { for (int i = 0; i < size; i++) { if (!checkedOut[i]) { checkedOut[i] = true; return items.get(i); } } return null; } private synchronized boolean releaseItem(T item) { int index = items.indexOf(item); if (index == -1) return false; if (checkedOut[index]) { checkedOut[index] = false; return true; } return false; } }
对象创建
public class Fat { //保证线程可见性 private volatile double d; private static int counter = 0; private final int id = counter++; public Fat() { for (int i = 0; i < 10000; i++) { d += (Math.PI + Math.E) / (double)i; } } public void operation() { System.out.println(this); } public String toString() { return "Fat id:" + id; } }
创建一个任务定时迁入迁出对象
public class CheckOutTask<T> implements Runnable { private static int counter = 0; private final int id = counter++; private Pool<T> pool; public CheckOutTask(Pool<T> pool) { this.pool = pool; } @Override public void run() { try { T item = pool.checkOut(); System.out.println(this + "checked out " + item); TimeUnit.SECONDS.sleep(1); System.out.println(this + "checked in " + item); pool.checkIn(item); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "CheckOutTask{" + "id=" + id + '}'; } }
测试迁入迁出测试
public class SemaphoreDemo { final static int SIZE = 25; public static void main(String[] args) throws Exception{ //1.创建对象池 final Pool<Fat> pool = new Pool<>(Fat.class, SIZE); //2.先对象池所有对象迁出,过一分中后迁入 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < SIZE; i++) { exec.execute(new CheckOutTask<Fat>(pool)); } //3.对象池中对象全部迁出,已迁出的不再迁出 System.out.println("All Checkout Tasks Created"); List<Fat> list = new ArrayList<>(); for (int i = 0; i < SIZE; i++) { Fat f = pool.checkOut(); System.out.println(i + ": main() thread checked out "); f.operation(); list.add(f); } //4.Future阻塞迁出 Future<?> blocked = exec.submit(new Runnable() { @Override public void run() { try { pool.checkOut(); } catch (InterruptedException e) { System.out.println("checkOut Interrupted"); e.printStackTrace(); } } }); //5.中断阻塞 TimeUnit.SECONDS.sleep(2); blocked.cancel(true); System.out.println("Checking in objects in" + list); for (Fat f: list) { pool.checkIn(f); } //6.第二次不会迁入 for (Fat f: list) { pool.checkIn(f); } exec.shutdown(); } }