我正在使用LinkedBlockingQueue
队列来实现用于TCP/IP事件传输的producer-consumer pattern
,我正在使用boolean offer(e)
,这意味着一旦队列达到其容量,新的传入事件将被忽略(删除)并返回false
。
现在,我必须将事件保留一段可配置的时间(例如2秒),因此我决定使用DelayQueue
,它可以保留元素并仅在时间到期时释放元素。
不幸的是,DelayQueue
是无界。我想知道是否有办法设置DelayQueue
或delayQueue.size() == CAPACITY
的容量始终可靠吗?
最佳答案
这是一个很大的问题,因为如果我们要对其进行继承或派发给我们,并且需要在DelayQueue中使用私有(private)锁,则需要访问DelayQueue使用的内部锁。
我们不能使用第二个锁,因为那会导致take
出现问题。您也许可以自己实现,但是这样做距离您自己的DelayQueue实现还远远超过一半,因此可能不是您想要的。
我们可以使用反射来访问锁。但是请注意,这并不是最好的主意,因为它依赖于DelayQueue的实现细节。如果您更改正在运行的JRE版本,它可能无法在所有JRE上运行,甚至可能会中断。就是说,我认为这是绝对简单的解决方案,尽管有点脏。
/**
* Bounded implementation of {@link DelayQueue}. This implementation uses
* reflection to access the internal lock in {@link DelayQueue} so might
* only work on the Oracle 1.8 JRE.
* @param <T>
*/
public class BoundedDelayQueue<T extends Delayed> extends DelayQueue<T> {
// Lock used to synchronize every operation
private final transient ReentrantLock lock;
// The limit
private final int limit;
BoundedDelayQueue(int limit) {
try {
// Grab the private lock in DelayQueue using reflection so we can use it
// to gain exclusive access outside of DelayQueue
Field lockField = DelayQueue.class.getDeclaredField("lock");
lockField.setAccessible(true);
this.lock = (ReentrantLock) lockField.get(this);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error("Could not access lock field", e);
}
this.limit = limit;
}
@Override
// All the various ways of adding items in DelayQueue delegate to
// offer, so we only have to override it and not the other methods
public boolean offer(final T t) {
// Lock the lock
lock.lock();
try {
// Check the size limit
if(size() == limit) {
return false;
}
// Forward to superclass
return super.offer(t);
} finally {
lock.unlock();
}
}
}
请注意,这并不能实现带有超时的
offer
,如果您需要自己做的话。