我有一堆要通过将它们包装在一个字节数组中来发送到消息队列的键和值。我将对所有键和值进行一个字节数组(应始终小于50K),然后将其发送到我们的消息传递队列。我有标题,然后是数据。

包类:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.getDatacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    // below line throws "BufferOverflowException"
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}


在上面的代码中,我在java.nio.BufferOverflowException方法的buffer.put(itemBuffer);处获得sendData。我无法理解为什么会收到此异常以及如何解决它。

这是我如何调用此代码:

Packet packet = new Packet(partition);
packet.addAndSendJunked("hello".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
packet.close();

最佳答案

显然itemBuffer加上已经放入buffer的标头的长度超过了buffer.capacity(),即MAX_SIZE。

认为addHeader()中有错误或创建itemBuffer的原因,或者MAX_SIZE太小。

NB

SendRecord.getInstance().sendToQueueAsync(address, buffer.array());


即使没有将MAX_SIZE个字节放入其中,您也要在其中排队整个buffer个MAX_SIZE个字节。最好将buffer本身传递给此方法,并避免代码中的不停换行。

10-08 02:24