问题描述
我有一个代码,我正在处理套接字,我需要确保我在两个线程之间共享相同的套接字。在我的下面的代码中,我有一个后台线程,每60秒运行一次并调用 updateLiveSockets()
方法。在 updateLiveSockets()
方法中,我迭代我拥有的所有套接字,然后通过调用 send
SendToQueue的方法
类和响应的基础我将它们标记为活的或死的。
I have a code in which I am dealing with sockets and I need to make sure that I don't share same socket between two threads. In my below code, I have a background thread which runs every 60 seconds and calls updateLiveSockets()
method. In the updateLiveSockets()
method, I iterate all the sockets I have and then start pinging them one by one by calling send
method of SendToQueue
class and basis on the response I mark them as live or dead.
现在所有读者线程将同时调用 getNextSocket()
方法来获取下一个可用的套接字,因此它必须是线程安全的,我需要确保所有读者线程都应该看到相同的consitent状态 SocketHolder
和套接字
。
Now all the reader threads will call getNextSocket()
method concurrently to get the next live available socket so it has to be thread safe and I need to make sure all the reader threads should see the same consitent state of SocketHolder
and Socket
.
以下是我的 SocketManager
class:
Below is my SocketManager
class:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的 SendToQueue
class:
And here is my SendToQueue
class:
// this method will be called by multiple threads concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
问题陈述
现在您可以看到我在两个线程之间共享相同的套接字。似乎 getNextSocket()
可以将 0MQ套接字
返回到线程A
。同时,计时器线程
可以访问相同的 0MQ套接字
来ping它。在这种情况下,线程A
和计时器线程
正在改变相同的 0MQ套接字
,这可能会导致问题。所以我试图找到一种方法,以便我可以防止不同的线程同时将数据发送到同一个套接字并弄乱我的数据。
Now as you can see that I am sharing same socket between two threads. It seems getNextSocket()
could return a 0MQ socket
to thread A
. Concurrently, the timer thread
may access the same 0MQ socket
to ping it. In this case thread A
and the timer thread
are mutating the same 0MQ socket
, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.
所以我决定同步套接字,以便没有两个线程可以同时访问同一个套接字。以下是我在 updateLiveSockets
方法中所做的更改。我通过以下方法在套接字上同步:
So I decided to synchronize the socket so that no two threads can access the same socket at the same time. Below is the change I made in updateLiveSockets
method. I synchronized on the socket in below method:
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// using the socket as its own lock
synchronized (socket) {
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
以下是我在<$ c中所做的更改$ c> doSendAsync 方法。在这里我也在发送它之前同步套接字。
And below is the change I made in doSendAsync
method. In this also I synchronized on socket before sending on it.
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A by synchronizing on it
synchronized (socket) {
return msg.send(socket);
}
} finally {
msg.destroy();
}
}
我可以确定的最佳方式是什么?我不是在两个线程之间共享相同的套接字?一般来说,我有大约60个套接字和20个线程访问这些套接字。
What is the best way by which I can make sure that I am not sharing same sockets between two threads? In general I have around 60 sockets and 20 threads accessing those sockets.
如果许多线程使用相同的套接字,则资源利用率不高。此外,如果 msg.send(套接字);
被阻止(技术上它不应该),所有等待此套接字的线程都被阻止。所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字而不是特定套接字上的同步。还有我错过的任何角落案例或边缘案例会导致一些错误吗?
If many threads use the same socket, resources aren't well utilized. Moreover If msg.send(socket);
is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket. Also is there any corner case or edge case that I have missed which can lead to some bug?
推荐答案
首先你需要客户通过 Socket
通知您完成的方法。您可以添加一个允许它发出信号的方法。这是合法的,它会起作用,但你必须依靠你的客户表现得很好。或者更确切地说,使用套接字的程序员不会忘记返回它。有一种模式有助于解决这个问题:模式。而不是给出 Socket
,而是创建一个接受 Consumer< Socket>
的方法,然后执行消费者,并返回 Socket
本身。
First of all you need a way for clients to notify you that they're done using a Socket
. You could add a method that allows them to signal this. That's legit and it will work, but you'll have to rely on your clients to be well behaved. Or rather that the programmer using your socket doesn't forget to return it. There's a pattern that helps address this : the execute around pattern. Rather than giving out a Socket
, you make a method that accepts a Consumer<Socket>
, and then executes the consumer, and does the returning of the Socket
itself.
public void useSocket(Consumer<Socket> socketUser) {
Socket socket = getSocket();
try {
socketUser.accept(socket);
} finally {
returnSocket(socket);
}
}
现在让我们来看看我们将如何实施 getSocket()
和 returnSocket()
。显然,它涉及从某种集合中获取它们,并将它们返回到该集合。 队列
在这里是一个不错的选择(正如其他人也注意到的)。它允许从一侧获取它,并在另一侧返回,此外还有大量高效的线程安全实现,并且接收者和加法器通常不会相互争用。既然您事先知道套接字的数量,我会选择 ArrayBlockingQueue
。
Now let's look at how we're going to implement getSocket()
and returnSocket()
. Clearly it involves getting them from some sort of collection, and returning them back to that collection. A Queue
is a good choice here (as others have also noted). It allows getting it from one side, and returning on the other, plus there are plenty of efficient thread safe implementations, and takers and adders are typically not in contention with one another. Since you know the number of sockets beforehand, I'd opt for an ArrayBlockingQueue
.
此处另一个问题是您的实现返回可选
。如果没有可用的套接字
,我不确定你的客户会做什么,但是如果它正在等待并重试,我建议你只需要 getSocket()
阻塞队列。事实上,我会尊重你方法的这一方面,并考虑到可能没有 Socket
。对于执行周围方法,这将把它转换为 useSocket()
方法,如果没有<$ c,则返回 false
$ c>套接字可用。
An additional concern here is that your implementation returns an Optional
. I'm not sure what your clients will do if there is no available Socket
, but if it is waiting and retrying, I'd suggest you simply make getSocket()
blocking on the queue. As it is, I'll respect this aspect of your approach, and take into account that there may not have been a Socket
available. For the execute around approach, this'll translate this into the useSocket()
method returning false
if no Socket
was available.
private final BlockingQueue<Socket> queue;
public SocketPool(Set<Socket> sockets) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
}
public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
try {
maybeSocket.ifPresent(socketUser);
return maybeSocket.isPresent();
} finally {
maybeSocket.ifPresent(this::returnSocket);
}
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
那就是它,那是你的 SocketPool
。
啊,但接着是吝啬的一点:检查活力。这很吝啬,因为你的活动检查实际上与你的普通客户竞争。
Ah, but then the stingy bit : the checking for liveness. It's stingy because your liveness check actually competes with your regular clients.
为了解决这个问题,我建议如下:让你的客户报告是否套接字
他们得到了或不活。由于检查活跃性归结为使用Socket,这对您的客户来说应该是直截了当的。
In order to address this, I suggest the following : let your clients report whether the Socket
they got was live or not. Since checking for liveness comes down to using the Socket, this should be straightforward for your clients.
因此,而不是 Consumer< Socket>
,我们将采用函数< Socket,Boolean>
。如果函数返回 false
,我们将认为 Socket
不再存在。在这种情况下,我们不是将其添加回常规队列,而是将其添加到死套接字的集合中,我们将有一个计划任务,间歇性地重新检查死套接字。由于这发生在一个单独的集合上,计划的检查不再与常规客户竞争。
So instead of a Consumer<Socket>
, we'll take a Function<Socket, Boolean>
. And if the function returns false
, we'll consider the Socket
to be no longer live. In that case, rather than adding it back to the regular queue, we add it to a collection of dead Sockets, and we'll have a scheduled task, that rechecks the dead sockets intermittently. As this happens on a separate collection, the scheduled checking does not compete with regular clients any more.
现在你可以创建一个 SocketManager
使用 Map
将数据中心映射到 SocketPool
实例。此映射不需要更改,因此您可以将其设置为final并在 SocketManager
的构造函数中初始化它。
Now you can make a SocketManager
with a Map
that maps data centers to SocketPool
instances. This map doesn't need to change, so you can make it final and initialize it in the SocketManager
's constructor.
这是我的初步代码 SocketPool
(未经测试):
This is my preliminary code for SocketPool
(untested) :
class SocketPool implements AutoCloseable {
private final BlockingQueue<Socket> queue;
private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
private final ScheduledFuture<?> scheduledFuture;
public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
}
public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
boolean wasLive = true;
try {
wasLive = maybeSocket.map(socketUser).orElse(false);
return wasLive && maybeSocket.isPresent();
} finally {
boolean isLive = wasLive;
maybeSocket.ifPresent(socket -> {
if (isLive) {
returnSocket(socket);
} else {
reportDead(socket);
}
});
}
}
private void reportDead(Socket socket) {
deadSockets.add(socket);
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
private void recheckDeadSockets() {
for (int i = 0; i < deadSockets.size(); i++) {
Socket socket = deadSockets.poll();
if (checkAlive(socket)) {
queue.add(socket);
} else {
deadSockets.add(socket);
}
}
}
private boolean checkAlive(Socket socket) {
// do actual live check with SendSocket class, or implement directly in this one
return true;
}
@Override
public void close() throws Exception {
scheduledFuture.cancel(true);
}
}
这篇关于如何确保我不是同时在两个线程之间共享相同的套接字?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!