在我的应用程序中,我正在执行一些繁重的查找操作。这些操作必须在单个线程内完成(持久性框架限制)。

我想缓存结果。因此,我有一个带有内部类Worker的UMRCache类:

public class UMRCache {
  private Worker worker;
  private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
  private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
  public UMRCache(Repository repository) {
    this.worker = new Worker(repository);
    this.worker.start();
  }

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

 private class Worker extends Thread {
   public void run() {
      boolean doRun = true;
      while (doRun) {
         synchronized (requests) {
            while (requests.isEmpty() && doRun) {
               requests.wait(); // Wait until there's work to do
            }
            synchronized (cache) {
               Set<String> processed = new HashSet<String>();
               for (String key : requests) {
                 // Do the lookup
                 Object result = respository.lookup(key);
                 // Save to cache
                 cache.put(key, result);
                 processed.add(key);
               }
               // Remove processed requests from queue
               requests.removeAll(processed);
               // Notify all threads waiting for their requests to be served
               cache.notifyAll();
            }
         }
      }
   }
}

}

我有一个测试用例:
公共(public)类UMRCacheTest扩展了TestCase {
私有(private)UMRCache umrCache;
public void setUp() throws Exception {
    super.setUp();
    umrCache = new UMRCache(repository);
}

public void testGet() throws Exception {
    for (int i = 0; i < 10000; i++) {
        final List fetched = Collections.synchronizedList(new ArrayList());
        final String[] keys = new String[]{"key1", "key2"};
        final String[] expected = new String[]{"result1", "result2"}
        final Random random = new Random();

        Runnable run1 = new Runnable() {
            public void run() {
                for (int i = 0; i < keys.length; i++) {
                    final String key = keys[i];
                    final Object result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };
        Runnable run2 = new Runnable() {
            public void run() {
                for (int i = keys.length - 1; i >= 0; i--) {
                    final String key = keys[i];
                    final String result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };

        final Thread thread1 = new Thread(run1);
        thread1.start();
        final Thread thread2 = new Thread(run2);
        thread2.start();
        final Thread thread3 = new Thread(run1);
        thread3.start();
        thread1.join();
        thread2.join();
        thread3.join();
        umrCache.dispose();
        assertEquals(6, fetched.size());
    }
}

}

该测试随机失败,大约运行10次中的1次。它会在最后一个断言时失败:assertEquals(6,fetched.size()),在assertEquals(key,results [i])时,否则有时测试运行程序将永远无法完成。

所以我的线程逻辑有些问题。有小费吗?

编辑:

感谢所有提供帮助的人,我现在可能已经破解了它。
解决方案似乎是:
 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     while (!this.cache.containsKey(key)) {
       this.cache.wait();
     }
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

最佳答案

get()方法逻辑可能会错过结果并陷入困境



   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }

   // ----- MOMENT1.  If at this moment Worker puts result into cache it
   // will be missed since notification will be lost

   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();

    // ----- MOMENT2.  May be too late, since cache notifiation happened before at MOMENT1

    // Now, cache should contain a value for key
     return this.cache.get(key);
   }

10-05 19:54