背景:我有三个主题。 ThreadA负责将元素写入队列,如果队列已满,则通知ThreadC从队列中读取元素。 ThreadB是另一个条件,如果队列未满,但是时间超过5秒,则Thread通知threadC从队列中获取元素,最后,ThreadC通知ThreadB刷新其时间戳。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private Lock lock = new ReentrantLock();
    private Condition conA = lock.newCondition();
    private Condition conB = lock.newCondition();
    private Condition conC = lock.newCondition();

    ArrayBlockingQueue<Integer> readQueueA = new ArrayBlockingQueue<>(3);

    public static void main(String[] args) {
        Main main1 = new Main();

        try {
            ThreadA threadWrite = main1.new ThreadA();
            Thread threadOut = new Thread(threadWrite);
            threadOut.start();

            ThreadB threadB = main1.new ThreadB();
            Thread threadBB = new Thread(threadB);
            threadBB.start();

            ThreadC threadRead = main1.new ThreadC();
            Thread threadIn = new Thread(threadRead);
            threadIn.start();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    class ThreadA implements Runnable {
        public void addElement(Integer i) {
            try {
                readQueueA.put(i);
            } catch (Exception ex ){
                ex.printStackTrace();
            }
        }

        @Override
        public void run() {
            int i = 0;
            while (i < 10) {
                lock.lock();
                try {
                    if (readQueueA.size() < 3) {
                        addElement(i++);
                    } else {
                        System.out.println("notice C: " + new Date());
                        conC.signal();
                        conA.await();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class ThreadB implements Runnable {
        @Override
        public void run() {
            reSetStartTime();
            while (true) {
                lock.lock();
                try {
                    if (!conB.await(5, TimeUnit.SECONDS)) {
                        System.out.println("Timeout Zzzzzzz: " + new Date());
                        conC.signal();
                    } else {
                        System.out.println("RefreshB。。。。" + new Date());
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class ThreadC implements Runnable {
        public void printQueue(ArrayBlockingQueue<Integer> printQueue) {
            int len = printQueue.size();
            System.out.println("Queen Size :" + printQueue.size());
            for (int i = 0; i < len; i++) {
                System.out.print(printQueue.poll());
            }
            System.out.println();
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    System.out.println("I'm thread C " + new Date());
                    conC.await();
                    System.out.println("I'm thread C, and I wake up " + new Date());
                    if (readQueueA.size() > 0) {
                        System.out.print("OUTPUT: ");
                        printQueue(readQueueA);
                        conA.signal();
                        conB.signal();
                    } else {
                        System.out.println("No elements");
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}


输出:

"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe"
notice C: Fri Sep 21 10:07:35 CST 2018
I'm thread C Fri Sep 21 10:07:35 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
345
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
678
I'm thread C Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:45 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:45 CST 2018
OUTPUT: Queen Size :1
9
I'm thread C Fri Sep 21 10:07:45 CST 2018
RefreshB。。。。Fri Sep 21 10:07:45 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:50 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:50 CST 2018
No elements
....


但是输出不是我所期望的,为什么一开始为什么ThreadB outPut TimeOut Zzz?我认为应该不输出该项目,因为ThreadA唤醒了ThreadC,下一步应该执行ThreadC并且Thread可以在短时间内完成运行5s。有人可以帮我解释或修复它吗?感谢你!

预期:

notice C: Fri Sep 21 10:28:31 CST 2018
I'm thread C Fri Sep 21 10:28:**31** CST 2018
I'm thread C, and I wake up Fri Sep 21 10:28:31 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:28:31 CST 2018
notice C: Fri Sep 21 10:28:31 CST 2018
RefreshB。。。。Fri Sep 21 10:28:31 CST 2018
...

最佳答案

这是在您发布的特定情况下发生的情况:A开始,运行while循环,直到发出C信号并等待。然后,C启动并等待,从而出现问题。在C开始等待之前发出了一个信号,因此该信号呼叫丢失了,现在A和C都在等待。所以在这一点上,控制台上是

通知C ...
我是线程C ...

现在,B启动,并等待整整5秒钟,因为没有其他线程可发出信号。这样,conB.await(5, TimeUnit.SECONDS)返回false,它打印Timeout Zzzzzzz:,然后向C发出信号。这就是为什么事情看起来混乱的原因,只有当另一个线程已经在等待时,信号才起作用!

要解决此问题,请尝试更改

conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());




if (readQueueA.size() < 3) {
    conC.await();
    System.out.println("I'm thread C, and I wake up " + new Date());
}


这样,如果等待条件已经满足,C就不会等待。

关于java - 三线程通讯与同步,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52427246/

10-08 21:58