


I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.


Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.


   |2|    <- incoming queue
  / | \
 2  1  3  <- worker threads
  \ | /
   |2|    <- outgoing queue

I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.


Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.


The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).


This is how I solved your problem in a previous project (but with java.util.concurrent):


(1) WorkItem class does the actual work/processing:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        this.content = content;

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;


(2) This class puts Work Items in a queue and initiates processing:

public class Producer {
    public Producer() {
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);


(3) Once processing is finished the Work Items are dequeued here:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        this.workerQueue = workerQueue;

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing


(4) This is how you would use the stuff in your code and submit new work:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)


08-03 21:19