I am working on my application which sends data to zeromq. Below is what my application does:

  • I have a class SendToZeroMQ that send data to zeromq.
  • Add same data to retryQueue in the same class so that it can be retried later on if acknowledgment is not received. It uses guava cache with maximumSize limit.
  • Have a separate thread which receives acknowledgement from the zeromq for the data that was sent earlier and if acknowledgement is not received, then SendToZeroMQ will retry sending that same piece of data. And if acknowledgement is received, then we will remove it from retryQueue so that it cannot be retried again.


Idea is very simple and I have to make sure my retry policy works fine so that I don't loose my data. This is very rare but in case if we don't receive acknolwedgements.

I am thinking of building two types of RetryPolicies but I am not able to understand how to build that here corresponding to my program:

  • RetryNTimes: In this it will retry N times with a particular sleep between each retry and after that, it will drop the record.
  • ExponentialBackoffRetry: In this it will exponentially keep retrying. We can set some max retry limit and after that it won't retry and will drop the record.

Below is my SendToZeroMQ class which sends data to zeromq, also retry every 30 seconds from a background thread and start ResponsePoller runnable which keeps running forever:

public class SendToZeroMQ {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
  private final Cache<Long, byte[]> retryQueue =
              RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;

  private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
      public void run() {
        for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
          sendTo(entry.getKey(), entry.getValue());
    }, 0, 30, TimeUnit.SECONDS);

  public boolean sendTo(final long address, final byte[] encodedRecords) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());

  public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    boolean sent = msg.send(socket);
    // adding to retry queue
    retryQueue.put(address, encodedByteArray);
    return sent;

  public void removeFromRetryQueue(final long address) {

Below is my ResponsePoller class which polls all the acknowledgement from the zeromq. And if we get an acknowledgement back from the zeromq then we will remove that record from the retry queue so that it doesn't get retried otherwise it will get retried.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();

  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
                long address = TestUtils.getAddress(frame.getData());
                // remove from retry queue since we got the acknowledgment for this record
            } catch (Exception ex) {
                // log error
            } finally {


As you can see above, I am sending encodedRecords to zeromq using SendToZeroMQ class and then it gets retried every 30 seconds depending on whether we got an acknolwedgement back from ResponsePoller class or not.

For each encodedRecords there is a unique key called address and that's what we will get back from zeromq as an acknowledgement.


How can I go ahead and extend this example to build two retry policies that I mentioned above and then I can pick what retry policy I want to use while sending data. I came up with below interface but then I am not able understand how should I move forward to implement those retry policies and use it in my above code.

public interface RetryPolicy {
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
    public boolean allowRetry(int retryCount, long elapsedTimeMs);


Can I use guava-retrying or failsafe here becuase these libraries already have many retry policies which I can use?



I am not able to work out all the details regarding how to use the relevant API-s, but as for algorithm, you could try:

  • the retry-policy needs to have some sort of state attached to each message (atleast the number of times the current message has been retried, possible what the current delay is). You need to decide whether the RetryPolicy should keep that itself or if you want to store it inside the message.
  • instead of allowRetry, you could have a method calculating when the next retry should occur (in absolute time or as a number of milliseconds in the future), which will be a function of the state mentioned above
  • the retry queue should contain information on when each message should be retried.
  • instead of using scheduleAtFixedRate, find the message in the retry queue which has the lowest when_is_next_retry (possibly by sorting on absolute retry-timestamp and picking the first), and let the executorService reschedule itself using schedule and the time_to_next_retry
  • for each retry, pull it from the retry queue, send the message, use the RetryPolicy for calculating when the next retry should be (if it is to be retried) and insert back into the retry queue with a new value for when_is_next_retry (if the RetryPolicy returns -1, it could mean that the message shall not be retried any more)


08-23 22:24