1.先分析三个LinkedBlockingDeque<Event>类型的takeList,putList,queue
putList: 存放的是来自source生产的数据,通过调用doPut(Event event)方法,它是怎样到queue的,在每次运行doCommit的时候,会循环放到queue,事实上doCommit()放法仅仅做了putlist交给queue,
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
takelist: 每次sink消费。都会加到takelist,一般不起什么作用,可是操作失败。rollback就起作用了
protected void doRollback() {
int takes = takeList.size();
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
bytesRemaining.release(putByteCounter);
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
}
queue:存放的即将传递给sink的全部数据.
2.參数相应关系
transactionCapacity:设置takelist,putlist最大容量
capacity: 设置queue的最大容量
keep-alive: Semaphore的tryAcquire的timeout的參数,添加到takelist,putlist,queue超时时间