我有一个方法,它采用 Partition
枚举的参数。通过传递不同的 partition
值,该方法将在同一时间段内由多个后台线程(最多 15 个)调用。这里 dataHoldersByPartition
是 Partition
和 ConcurrentLinkedQueue<DataHolder>
的映射。
private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;
//... some code to populate entry in `dataHoldersByPartition`
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}
下面是我的
Message
类:public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
// Output of this method should always be less than 50k always
public byte[] serialize() {
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
// now the data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and to string method here
}
基本上,我必须确保的是,无论何时调用
sendToDatabase
方法,message.serialize()
字节数组的大小都应始终小于 50k。我的 sendToDatabase
方法发送来自 serialize
方法的字节数组。由于这种情况,我正在做以下验证以及其他一些事情。在该方法中,我将迭代 dataHolders
CLQ,并从中提取 clientKeyBytes
和 processBytes
。这是我正在做的验证:clientKeyBytes
长度大于 255 那么我会跳过它并继续迭代。 totalSize
变量,该变量将是 clientKeyLength
和 processBytesLength
的总和,并且该 totalSize
长度应始终小于 50000 字节。 clientKeyBytesAndProcessBytesHolder
映射发送到 sendToDatabase
方法并清除映射,将 totalSize
重置为 0 并再次开始填充。 dataHolders
为空,那么它将发送它所拥有的任何内容。 我相信我当前的代码中存在一些错误,因为可能由于我的情况,某些记录没有被正确发送或丢弃在某个地方,我无法弄清楚这一点。看起来要正确实现这个 50k 条件,我可能必须使用
getBufferCapacity
方法在调用 sendToDatabase
方法之前正确计算大小? 最佳答案
我检查了你的代码,按照你的逻辑看起来不错。正如您所说,它将始终存储小于 50K 的信息,但它实际上会将信息存储到 50K。要使其小于 50K,您必须将 if 条件更改为 if (totalSize + additionalLength >= 50000)
。
如果您的代码仍然不能满足您的要求,即在 totalSize + additionalLength
大于 50k 时存储信息,我可以建议您很少考虑。
由于超过 50 个线程调用此方法,您需要考虑代码中的两个部分进行同步。
一个是全局变量,它是一个容器 dataHoldersByPartition
对象。如果在此容器对象中发生多个并发和并行搜索,结果可能并不完美。只需检查容器类型是否同步。如果没有像下面这样制作这个块:-
synchronized(this){
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
}
现在,我只能给出两个建议来解决这个问题。一种是代替
if (totalSize + additionalLength > 50000)
,您可以检查对象 clientKeyBytesAndProcessBytesHolder
if(sizeof(clientKeyBytesAndProcessBytesHolder) >= 50000)
的大小(检查 java 中 sizeof 的适当方法)。其次是缩小区域以检查它是否是多线程的副作用。所有这些建议都是为了找出问题所在的区域,并且只能从您的角度进行修复。首先检查您的方法
validateAndSend
是否完全满足您的要求。为此,首先同步整个 validateAndSend
方法并检查一切是否正常或仍然具有相同的结果。如果仍然有相同的结果,则意味着这不是因为多线程,而是您的编码不符合要求。如果它工作正常,则意味着它是多线程问题。如果方法同步正在解决您的问题但降低了性能,您只需从中删除同步并集中可能导致问题的代码的每个小块并使其同步块(synchronized block)并在仍未解决您的问题时删除。像那样,最后你找到了实际创建问题的代码块,并将其保留为同步以最终修复它。例如第一次尝试:-
`private synchronize void validateAndSend`
第二次尝试:从方法中删除同步关键字并执行以下步骤:-
synchronize(this){
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
sendToDatabase(message.getAddress(), message.serialize());
}
如果您认为我没有正确理解您,请告诉我。
关于java - 以大小受限的块将数据发送到数据库,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46899081/