使用信号量的生产者

使用信号量的生产者

本文介绍了使用信号量的生产者/消费者;陷入僵局的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 http://en.wikipedia.org/wiki/Producer-consumer_problem我想使用信号量来模拟P/C问题.我陷入僵局,不知道是什么问题.

According to http://en.wikipedia.org/wiki/Producer-consumer_problem I want to simulate P/C problem using semaphore. I am getting deadlock and I don't know what is problem.

public static void main(String[] args) {
        CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
    private static final int MAX_SIZE = 10;

    private Semaphore mutex = new Semaphore(1);
    private Semaphore fillCount = new Semaphore(0);
    private Semaphore emptyCount = new Semaphore(MAX_SIZE);

    @Override
    public boolean offer(Object e) {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        boolean result = super.offer(e);
        System.out.println("offer " + size());
        try {
            fillCount.release();
            emptyCount.acquire();
            mutex.release();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return result;
    }

    @Override
    public Object poll() {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Object result = super.poll();
        System.out.println("poll  " + size());
        try {
            emptyCount.release();
            fillCount.acquire();
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

class Producer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Producer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(2));
                blockingQueue.offer(new Object());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Consumer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4));
                blockingQueue.poll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

使用信号量

信号量解决了丢失唤醒电话的问题.在下面的解决方案中,我们使用两个信号量fillCount和emptyCount来解决问题. fillCount是要在缓冲区中读取的项目数,emptyCount是缓冲区中可以写入项目的可用空间数.将新项目放入缓冲区后,fillCount会增加,而emptyCount会减少.如果生产者在其值为零时尝试减少emptyCount,则生产者将进入睡眠状态.下次消费某物品时,emptyCount会增加,生产者将醒来.消费者的工作类似.

Semaphores solve the problem of lost wakeup calls. In the solution below we use two semaphores, fillCount and emptyCount, to solve the problem. fillCount is the number of items to be read in the buffer, and emptyCount is the number of available spaces in the buffer where items could be written. fillCount is incremented and emptyCount decremented when a new item has been put into the buffer. If the producer tries to decrement emptyCount while its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount is incremented and the producer wakes up. The consumer works analogously.

推荐答案

您的锁定顺序错误:

需要报价:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

类似的变更需要进行轮询:

similar change needed for poll:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

在您的实现中,您在等待互斥量的同时等待信号量,这会导致问题,因为另一个线程可以在等待互斥量以释放信号量.

in your implementation you are waiting for semaphores while holding the mutex which causes problems because the other thread can be waiting for the mutex in order to release a semaphore.

这篇关于使用信号量的生产者/消费者;陷入僵局的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 07:41