Semaphore(计数信号量):准许n个任务同时访问这个资源,正常的锁(locks或synchronized)在任何时候都是只允许一个任务访问一项资源。

信号的使用场景有:对象池,任务限流等。

下面有两个简单模拟:

1.任务限流:

public class TestSemaphore {
    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semp = new Semaphore(5);

        for (int i = 0; i < 20; i++) {
            final int NO = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        //获取许可
                        semp.acquire();
                        System.out.println("获取任务许可 :" +NO);
                        Thread.sleep((long)(Math.random()*10000));
                        //访问完后,释放
                        semp.release();
                        System.out.println("当前可用的信号量:" +  semp.availablePermits());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.execute(runnable);
        }
        //退出线程池
        exec.shutdown();
    }
}

2.对象池

对象池创建

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<>();
    private volatile boolean[] checkedOut;
    private Semaphore available;

    public Pool(Class<T> classObject, int size) {
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(size, true);
        for (int i = 0; i < size; i++) {
            try {
                items.add(classObject.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
    }

    public void checkIn(T x) {
        if (releaseItem(x)) {
            available.release();
        }
    }

    private synchronized T getItem() {
        for (int i = 0; i < size; i++) {
            if (!checkedOut[i]) {
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }

    private synchronized boolean releaseItem(T item) {
        int index = items.indexOf(item);
        if (index == -1) return false;
        if (checkedOut[index]) {
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

对象创建

public class Fat {
    //保证线程可见性
    private volatile  double d;
    private static int counter = 0;
    private final int id = counter++;
    public Fat() {
        for (int i = 0; i < 10000; i++) {
            d += (Math.PI + Math.E) / (double)i;
        }
    }

    public void operation() {
        System.out.println(this);
    }

    public String toString() {
        return "Fat id:" + id;
    }
}

创建一个任务定时迁入迁出对象

public class CheckOutTask<T> implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private Pool<T> pool;

    public CheckOutTask(Pool<T> pool) {
        this.pool = pool;
    }

    @Override
    public void run() {
        try {
            T item = pool.checkOut();
            System.out.println(this + "checked out " + item);
            TimeUnit.SECONDS.sleep(1);
            System.out.println(this + "checked in " + item);
            pool.checkIn(item);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "CheckOutTask{" +
                "id=" + id +
                '}';
    }
}

测试迁入迁出测试

public class SemaphoreDemo {
    final static int SIZE = 25;

    public static void main(String[] args) throws Exception{
        //1.创建对象池
        final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);

        //2.先对象池所有对象迁出,过一分中后迁入
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < SIZE; i++) {
            exec.execute(new CheckOutTask<Fat>(pool));
        }

        //3.对象池中对象全部迁出,已迁出的不再迁出
        System.out.println("All Checkout Tasks Created");
        List<Fat> list = new ArrayList<>();
        for (int i = 0; i < SIZE; i++) {
            Fat f = pool.checkOut();
            System.out.println(i + ": main() thread checked out ");
            f.operation();
            list.add(f);
        }

        //4.Future阻塞迁出
        Future<?> blocked = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    pool.checkOut();
                } catch (InterruptedException e) {
                    System.out.println("checkOut Interrupted");
                    e.printStackTrace();
                }
            }
        });

        //5.中断阻塞
        TimeUnit.SECONDS.sleep(2);
        blocked.cancel(true);

        System.out.println("Checking in objects in" + list);
        for (Fat f: list) {
            pool.checkIn(f);
        }

        //6.第二次不会迁入
        for (Fat f: list) {
            pool.checkIn(f);
        }
        exec.shutdown();
    }
}
12-04 14:07