我有一组任务,它们分为enquedeque两个部分,以确保首先运行合格的(多个),然后再依次运行其他合格的(例如优先级队列)。

enque中,将检查任务,仅qualified任务将被执行,而其他unqualified被阻止。

deque中,已完成的任务将从queue中删除​​,然后在notifyAll中被阻止(实际上是所有其他线程来选择合格的任务)。

这是我要实现的简化演示:

class MyTaskQueue {
    private static final Object THE_QUEUE_LOCK = new Object();
    public static Map<String, ReentrantLock> taskGroupLock = new HashMap<>();
    public static Map<String, Condition> taskGroupCondition = new HashMap<>();

    public static void enque(String name, String taskId) {
        synchronized (THE_QUEUE_LOCK) {
            taskGroupLock.putIfAbsent(name, new ReentrantLock());
            taskGroupCondition.putIfAbsent(name, taskGroupLock.get(name).newCondition());
        }
        synchronized (taskGroupLock.get(name)) {
            while (true) {
                if (isValid(taskId)) {
                    break; // Go!!;
                } else {
                    try {
                        taskGroupCondition.get(name).wait(); // blocked if it's not allowed;
                    } catch (InterruptedException ignored) {
                        ignored.printStackTrace();
                    }
                }
            }
        }
    }

    public static void deque(String name, String taskId) {
        if (taskGroup.containsKey(name) && taskGroup.get(name).contains(taskId)) {
            synchronized (THE_QUEUE_LOCK) {
                taskGroup.get(name).remove(taskId);
                if (taskGroup.get(name).isEmpty()) {
                    taskGroup.remove(name);
                }
                synchronized (taskGroupLock.get(name)) {
                    taskGroupCondition.get(name).notifyAll();
                }
            }
        }
    }

}


目前,尽管我检查了所有其他任务(至少其中大多数任务)是否均已正确阻止,但仅将执行第一个任务。

但是当我检查taskGroupCondition.get(name)时,firstWaiterlastWaiter都是null

我在这里错过了什么?

任何帮助将不胜感激。

最佳答案

如果我理解正确,那么您会问以下问题:


并行运行的线程从您的MyTaskQueue权限请求开始运行(通过enque方法)。
enqueMyTaskQueue方法将阻塞,直到请求运行的任务合格为止。
每个合格的线程通过调用MyTaskQueue方法向deque声明其已结束运行。
deque方法通知所有其他任务,以检查其中哪些任务合格,然后又开始运行。


然后,我可以看到以下解决方案:

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

public class MyTaskQueue {

    private final Map<String, Set<String>> runningTasks;
    private String qualifiedTaskId;

    public MyTaskQueue(String initialQualifiedTaskId) {
        runningTasks = new HashMap<>();
        qualifiedTaskId = initialQualifiedTaskId;
    }

    private synchronized boolean isValid(String taskId) {
        return qualifiedTaskId != null && taskId != null && taskId.equals(qualifiedTaskId); //Do your qualification tests here...
    }

    public synchronized void setQualifiedTaskId(String qualifiedTaskId) {
        this.qualifiedTaskId = qualifiedTaskId;
        notifyAll(); //Now that the qualification test changed, is time to notify every blocked task.
        //This way, all new qualified tasks will also be started. This "notifyAll()" operation is optional.
    }

    public synchronized void enque(String task, String taskId) {
        while (!isValid(taskId)) { //Reentrant lock.
            System.out.println("Blocking unqualified task {\"" + task + "\", \"" + taskId + "\"}...");
            try { wait(); } catch (InterruptedException ie) { /*Handle the exception...*/ }
        }
        runningTasks.putIfAbsent(task, new HashSet<>());
        runningTasks.get(task).add(taskId);
        System.out.println("Starting qualified task {\"" + task + "\", \"" + taskId + "\"}...");
    }

    //Optional method. Might be needed for example if a Thread
    //wants to check if another task is currently running...
    public synchronized boolean isRunning(String task, String taskId) {
        return runningTasks.containsKey(task) && runningTasks.get(task).contains(taskId);
    }

    public synchronized void deque(String task, String taskId) {
        if (isRunning(task, taskId)) { //Reentrant lock.

            //Cleanup:
            runningTasks.get(task).remove(taskId);
            if (runningTasks.get(task).isEmpty())
                runningTasks.remove(task);

            //Notify all blocked tasks:
            notifyAll();
        }
    }

    public static void main(final String[] args) {
        MyTaskQueue q = new MyTaskQueue("qualified");
        Random rand = new Random();
        new MyThread(q, "Task1", "qualified222", 2500 + rand.nextInt(500)).start();
        new MyThread(q, "Task2", "qualified222", 2500 + rand.nextInt(500)).start();
        new MyThread(q, "Task3", "qualified", 2500 + rand.nextInt(500)).start();
        new MyThread(q, "Task4", "qualified", 2500 + rand.nextInt(500)).start();
        new MyThread(q, "Task5", "foreverBlocked", 2500 + rand.nextInt(500)).start();
        try { Thread.sleep(3000); } catch (InterruptedException ie) { /*Handle the exception...*/ }
        synchronized (q) {
            System.out.println("Qualifying tasks of id \"qualified222\"...");
            q.setQualifiedTaskId("qualified222"); //Reentrant lock.
        }
        //Execution of main method never ends, because of the forever blocked task "Task5".
        //The "Task5" still runs while waiting for permission... See MyThread for details...
    }
}


并跟随MyThread

public class MyThread extends Thread {
    private final String task, taskId;
    private final int actionTime; //Dummy uptime to simulate.
    private final MyTaskQueue q;

    public MyThread(MyTaskQueue q, String task, String taskId, int actionTime) {
        this.q = q;
        this.task = task;
        this.taskId = taskId;
        this.actionTime = actionTime;
    }

    @Override
    public void run() {
        q.enque(task, taskId); //Wait for permission to run...
        System.out.println("Task {\"" + task + "\", \"" + taskId + "\"} is currently running...");

        //Now lets actually execute the task of the Thread:
        try { Thread.sleep(actionTime); } catch (InterruptedException ie) { /*Handle the exception.*/ }

        q.deque(task, taskId); //Declare Thread ended.
    }
}


MyThread是执行所需实际操作的类。

为简单起见,我假设任务的ID等于变量(即qualifiedTaskId)是合格的。

还有一个main方法来测试代码。

跟随样本输出(我为行编号):


正在阻止不合格的任务{“ Task1”,“ qualified222”}…
正在阻止不合格的任务{“ Task5”,“已阻止”}…
正在启动合格的任务{“ Task4”,“ qualified”}…
任务{“ Task4”,“ qualified”}当前正在运行...
正在启动合格的任务{“ Task3”,“ qualified”}…
任务{“ Task3”,“ qualified”}当前正在运行...
正在阻止不合格的任务{“ Task2”,“ qualified222”}…
正在阻止不合格的任务{“ Task2”,“ qualified222”}…
正在阻止不合格的任务{“ Task5”,“已阻止”}…
正在阻止不合格的任务{“ Task1”,“ qualified222”}…
正在阻止不合格的任务{“ Task1”,“ qualified222”}…
正在阻止不合格的任务{“ Task5”,“已阻止”}…
正在阻止不合格的任务{“ Task2”,“ qualified222”}…
ID为“ qualified222”的限定任务...
正在启动合格的任务{“ Task2”,“ qualified222”}…
任务{“ Task2”,“ qualified222”}当前正在运行...
正在阻止不合格的任务{“ Task5”,“已阻止”}…
正在启动合格的任务{“ Task1”,“ qualified222”}…
任务{“ Task1”,“ qualified222”}当前正在运行...
正在阻止不合格的任务{“ Task5”,“已阻止”}…
正在阻止不合格的任务{“ Task5”,“已阻止”}…


如您所见,第1至7行是每个线程的初始消息。
然后,调用8至10行是因为合格任务结束(因此将其重新阻塞)。
然后,调用第11至13行,因为另一个合格的任务结束了(因此将它们重新阻塞)。
然后,在第14至19行中,资格测试发生变化,新的合格任务开始运行。还有另一个任务(Task5)尚不合格。
最后,由于任务ID等于"qualified222" end的合格任务而调用第20至21行。

07-26 03:09