问题描述
我正在处理将数据发送到 zeromq
的应用程序。以下是我的应用程序:
I am working on my application which sends data to zeromq
. Below is what my application does:
- 我有一个类
SendToZeroMQ
将数据发送到zeromq。 - 将相同的数据添加到同一个类中的
retryQueue
因此如果没有收到确认,可以稍后重试。它使用最大尺寸限制的番石榴缓存。 - 有一个单独的线程从zeromq接收早先发送的数据的确认,如果没有收到确认,那么
SendToZeroMQ
将重试发送相同的数据。如果收到确认,我们将从retryQueue
中删除它,以便不能重新尝试。
- I have a class
SendToZeroMQ
that send data to zeromq. - Add same data to
retryQueue
in the same class so that it can be retried later on if acknowledgment is not received. It uses guava cache with maximumSize limit. - Have a separate thread which receives acknowledgement from the zeromq for the data that was sent earlier and if acknowledgement is not received, then
SendToZeroMQ
will retry sending that same piece of data. And if acknowledgement is received, then we will remove it fromretryQueue
so that it cannot be retried again.
想法非常简单,我必须确保我的重试策略正常,以免我的数据丢失。这是非常罕见的,但如果我们没有收到acknolwedgements。
Idea is very simple and I have to make sure my retry policy works fine so that I don't loose my data. This is very rare but in case if we don't receive acknolwedgements.
我正在考虑构建两种类型的 RetryPolicies
,但我无法了解如何在这里对应于我的程序:
I am thinking of building two types of RetryPolicies
but I am not able to understand how to build that here corresponding to my program:
-
RetryNTimes:
在这个过程中,每次重试之前都会重试N次特定的睡眠,之后它会删除记录。 -
ExponentialBackoffRetry:
在此,它将以指数级方式继续重试。我们可以设置一些最大重试限制,之后它不会重试并且将删除记录。
RetryNTimes:
In this it will retry N times with a particular sleep between each retry and after that, it will drop the record.ExponentialBackoffRetry:
In this it will exponentially keep retrying. We can set some max retry limit and after that it won't retry and will drop the record.
以下是我的将数据发送到zeromq的 SendToZeroMQ
类,也会从后台线程每30秒重试一次,然后启动 ResponsePoller
可永久运行的runnable:
Below is my SendToZeroMQ
class which sends data to zeromq, also retry every 30 seconds from a background thread and start ResponsePoller
runnable which keeps running forever:
public class SendToZeroMQ {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long, byte[]> retryQueue =
CacheBuilder
.newBuilder()
.maximumSize(10000000)
.concurrencyLevel(200)
.removalListener(
RemovalListeners.asynchronous(new CustomListener(), executorService)).build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorService.submit(new ResponsePoller());
// retry every 30 seconds for now
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
sendTo(entry.getKey(), entry.getValue());
}
}
}, 0, 30, TimeUnit.SECONDS);
}
public boolean sendTo(final long address, final byte[] encodedRecords) {
Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}
public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// adding to retry queue
retryQueue.put(address, encodedByteArray);
return sent;
}
public void removeFromRetryQueue(final long address) {
retryQueue.invalidate(address);
}
}
以下是我的 ResponsePoller
类,它从zeromq中轮询所有确认。如果我们从zeromq收到一个确认,那么我们将从重试队列中删除该记录,这样它就不会被重试,否则会被重试。
Below is my ResponsePoller
class which polls all the acknowledgement from the zeromq. And if we get an acknowledgement back from the zeromq then we will remove that record from the retry queue so that it doesn't get retried otherwise it will get retried.
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry queue since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryQueue(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
问题:
如上所示,我正在向zeromq发送 encodedRecords
SendToZeroMQ
类,然后每30秒重试一次,具体取决于我们是否从 ResponsePoller
class or not。
As you can see above, I am sending encodedRecords
to zeromq using SendToZeroMQ
class and then it gets retried every 30 seconds depending on whether we got an acknolwedgement back from ResponsePoller
class or not.
对于每个 encodedRecords
,有一个唯一的键叫地址
,这就是我们将从zeromq中获得的一个确认。
For each encodedRecords
there is a unique key called address
and that's what we will get back from zeromq as an acknowledgement.
我如何继续扩展这个例子来构建两个重试策略我上面提到的,然后我可以在发送数据时选择我要使用的重试策略。我想到了下面的界面,但是我不明白我应该如何推进实现这些重试策略,并在上面的代码中使用它。
How can I go ahead and extend this example to build two retry policies that I mentioned above and then I can pick what retry policy I want to use while sending data. I came up with below interface but then I am not able understand how should I move forward to implement those retry policies and use it in my above code.
public interface RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs);
}
我可以使用或这里因为这些库已经有很多重试策略,我可以使用?
Can I use guava-retrying or failsafe here becuase these libraries already have many retry policies which I can use?
推荐答案
我无法解决所有关于如何使用相关的API,但是对于算法,您可以尝试:
I am not able to work out all the details regarding how to use the relevant API-s, but as for algorithm, you could try:
- 重试策略需要有某种状态附加到每个消息(至少当前消息已经重试的次数,当前延迟是什么)。您需要确定RetryPolicy是否应该保留本身,或者是要将其存储在消息中。
- 而不是allowRetry,您可以使用一种方法来计算下一次重试应该发生的时间绝对时间或将来的毫秒数),这将是上述状态的函数
- 重试队列应包含有关何时应重试每个消息的信息。 / li>
- 而不是使用
scheduleAtFixedRate
,在重试队列中找到最低的when_is_next_retry
(可能通过绝对重试时间戳排序并选择第一个),并且让executorService使用schedule
和time_to_next_retry
- 对于每次重试,将其从重试队列中拉出,发送消息,使用RetryPolicy计算下一次重试应该是(如果要重试)并用新值插入重试队列对于
when_is_next_retry
(如果RetryPolicy返回-1,则可能意味着不再重试该消息)
- the retry-policy needs to have some sort of state attached to each message (atleast the number of times the current message has been retried, possible what the current delay is). You need to decide whether the RetryPolicy should keep that itself or if you want to store it inside the message.
- instead of allowRetry, you could have a method calculating when the next retry should occur (in absolute time or as a number of milliseconds in the future), which will be a function of the state mentioned above
- the retry queue should contain information on when each message should be retried.
- instead of using
scheduleAtFixedRate
, find the message in the retry queue which has the lowestwhen_is_next_retry
(possibly by sorting on absolute retry-timestamp and picking the first), and let the executorService reschedule itself usingschedule
and thetime_to_next_retry
- for each retry, pull it from the retry queue, send the message, use the RetryPolicy for calculating when the next retry should be (if it is to be retried) and insert back into the retry queue with a new value for
when_is_next_retry
(if the RetryPolicy returns -1, it could mean that the message shall not be retried any more)
这篇关于如何在将数据发送到另一个应用程序时执行重试策略?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!