问题描述
我有大约 60 个套接字和 20 个线程,我想确保每个线程每次都在不同的套接字上工作,所以我根本不想在两个线程之间共享同一个套接字.
I have around 60 sockets and 20 threads and I want to make sure each thread works on different socket everytime so I don't want to share same socket between two threads at all.
在我的 SocketManager
类中,我有一个后台线程,它每 60 秒运行一次并调用 updateLiveSockets()
方法.在 updateLiveSockets()
方法中,我遍历我拥有的所有套接字,然后通过调用 SendToQueue
类的 send
方法开始一个一个地 ping 它们并根据响应我将它们标记为生或死.在 updateLiveSockets()
方法中,我总是需要迭代所有套接字并 ping 它们以检查它们是活的还是死的.
In my SocketManager
class, I have a background thread which runs every 60 seconds and calls updateLiveSockets()
method. In the updateLiveSockets()
method, I iterate all the sockets I have and then start pinging them one by one by calling send
method of SendToQueue
class and basis on the response I mark them as live or dead. In the updateLiveSockets()
method, I always need to iterate all the sockets and ping them to check whether they are live or dead.
现在所有的读取器线程将同时调用SocketManager
类的getNextSocket()
方法来获取下一个可用的套接字以在该套接字上发送业务消息.所以我在套接字上发送了两种类型的消息:
Now all the reader threads will call getNextSocket()
method of SocketManager
class concurrently to get the next live available socket to send the business message on that socket. So I have two types of messages which I am sending on a socket:
- 一个是套接字上的
ping
消息.这仅从调用SocketManager
类中的updateLiveSockets()
方法的定时器线程发送. - 其他是套接字上的
business
消息.这是在SendToQueue
类中完成的.
- One is
ping
message on a socket. This is only sent from timer thread callingupdateLiveSockets()
method inSocketManager
class. - Other is
business
message on a socket. This is done inSendToQueue
class.
因此,如果 pinger 线程正在 ping 一个套接字以检查它们是否处于活动状态,那么其他业务线程不应使用该套接字.同样,如果业务线程使用套接字在其上发送数据,则 pinger 线程不应 ping 该套接字.这适用于所有套接字.但是我需要确保在 updateLiveSockets
方法中,每当我的后台线程启动时,我们都会 ping 所有可用的套接字,以便我们可以确定哪个套接字是活动的或死的.
So if pinger thread is pinging a socket to check whether they are live or not then no other business thread should use that socket. Similarly if business thread is using a socket to send data on it, then pinger thread should not ping that socket. And this applies to all the socket. But I need to make sure that in updateLiveSockets
method, we are pinging all the available sockets whenever my background thread starts so that we can figure out which socket is live or dead.
下面是我的 SocketManager
类:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的 SendToQueue
类:
// this method will be called by multiple reader threads (around 20) concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
if (!liveSocket.isPresent()) {
// log error
return false;
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
问题陈述
现在正如你在上面看到的,我在两个线程之间共享同一个套接字.看起来SocketManager
类中的getNextSocket()
可以返回一个0MQ socket
到Thread A
.同时,定时器线程
可以访问同一个0MQ socket
来ping它.在这种情况下,线程 A
和 定时器线程
正在改变相同的 0MQ 套接字
,这可能会导致问题.所以我试图找到一种方法来防止不同的线程同时将数据发送到同一个套接字并破坏我的数据.
Now as you can see above that I am sharing same socket between two threads. It seems getNextSocket()
in SocketManager
class could return a 0MQ socket
to Thread A
. Concurrently, the timer thread
may access the same 0MQ socket
to ping it. In this case Thread A
and the timer thread
are mutating the same 0MQ socket
, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.
我能想到的一种解决方案是在发送数据时在套接字上使用 synchronization
但如果多个线程使用同一个套接字,则资源没有得到很好的利用.此外,如果 msg.send(socket);
被阻塞(技术上不应该)所有等待这个套接字的线程都会被阻塞.所以我想可能有更好的方法来确保每个线程同时使用不同的单个活动套接字,而不是在特定套接字上进行同步.
One solution I can think of is using synchronization
on a socket while sending the data but if many threads uses the same socket, resources aren't well utilized. Moreover If msg.send(socket);
is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket.
推荐答案
看来您应该考虑在此处使用 try-with-resource 功能.您让 SocketHolder 或 Option 类实现 AutoCloseable 接口.例如,让我们假设 Option 实现了这个接口.然后 Option close 方法会将实例添加回容器.我创建了一个简单的例子来说明我的意思.它并不完整,但可以让您了解如何在代码中实现这一点.
It looks like you should consider using the try-with-resource feature here. You have the SocketHolder or Option class implement the AutoCloseable interface. For instance, let us assume that Option implements this interface. The Option close method will then add back the instance to the container. I created a simple example that shows what I mean. It is not complete but it gives you an idea on how to implement this in your code.
public class ObjectManager implements AutoCloseable {
private static class ObjectManagerFactory {
private static ObjectManager objMgr = new ObjectManager();
}
private ObjectManager() {}
public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; }
private static final int SIZE = 10;
private static BlockingQueue<AutoCloseable> objects = new LinkedBlockingQueue<AutoCloseable>();
private static ScheduledExecutorService sch;
static {
for(int cnt = 0 ; cnt < SIZE ; cnt++) {
objects.add(new AutoCloseable() {
@Override
public void close() throws Exception {
System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size());
objects.put(this);
System.out.println(Thread.currentThread() + " - Added object back to pool:" + this);
}
});
}
sch = Executors.newSingleThreadScheduledExecutor();
sch.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
updateObjects();
}
}, 10, 10, TimeUnit.MICROSECONDS);
}
static void updateObjects() {
for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) {
try(AutoCloseable object = objects.take()) {
System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size());
} catch (Throwable t) {
// TODO Auto-generated catch block
t.printStackTrace();
}
}
}
public AutoCloseable getNext() {
try {
return objects.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
try (ObjectManager mgr = ObjectManager.getInstance()) {
for (int cnt = 0; cnt < 5; cnt++) {
try (AutoCloseable o = mgr.getNext()) {
System.out.println(Thread.currentThread() + " - Working with " + o);
Thread.sleep(1000);
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Throwable tt) {
tt.printStackTrace();
}
}
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
ObjectManager.sch.shutdownNow();
}
}
这篇关于不要同时在两个线程之间共享同一个套接字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!