一个生产者和一个消费者
public class ConditionTest { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); private static int data = ; private static volatile boolean noUse = false; private static void produceData(){
try{
lock.lock();
while (noUse){
condition.await();
}
data++;
System.out.println("produce data "+data);
Thread.sleep();
noUse = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} private static void consumeData(){
try{
lock.lock();
while (!noUse){
condition.await();
}
Thread.sleep();
System.out.println("consume data "+data);
noUse = false;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
} }
public static void main(String[] args) {
new Thread(() -> {
for (;;){
produceData();
}
}).start(); new Thread(() -> {
for (;;){
consumeData();
}
}).start();
}
}
多个生产者和多个消费者
public class ConditionTest2 { private static Lock lock = new ReentrantLock(); private static Condition PRODUCE_COND = lock.newCondition(); private static Condition CONSUNME_COND = lock.newCondition(); private static LinkedList<String> list = new LinkedList<String>(); private static final int MAX_CAPACITY = ; private static AtomicInteger counter = new AtomicInteger(); private static void produce(){
try{
lock.lock();
while (list.size() >= MAX_CAPACITY){
PRODUCE_COND.await();
}
String value = "data:"+counter.getAndIncrement();
System.out.println(Thread.currentThread().getName()+" produce "+value);
list.addLast(value);
CONSUNME_COND.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} private static void consume(){
try{
lock.lock();
while (list.isEmpty()){
CONSUNME_COND.await();
}
String value = list.removeFirst();
System.out.println(Thread.currentThread().getName()+" consume "+value);
PRODUCE_COND.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public static void main(String[] args) {
for(int i=; i<; i++){
new Thread(()->{
for(;;){
produce();
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者"+i).start();
} for(int i=; i<; i++){
new Thread(()->{
for(;;){
consume();
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者"+i).start();
} }
}