问题描述
我有一个类,它在 code>不会与其自身同时运行,但不需要 updateLiveSockets()上的 synchronized ,但是,它也没有造成任何实际的危害。
所以是的,这个类是线程安全的,因为它是。
但是,并不完全清楚 SocketHolder 是否可以被多个线程同时使用。实际上,这个类只是尝试通过选择一个随机的实例来最小化并发使用 SocketHolder s(不需要随机选择一个随机索引来拖动整个数组)。它不会真正阻止并发使用。
能否提高效率?
我相信它可以。在查看 updateLiveSockets()方法时,它似乎构建了完全相同的映射,除了 SocketHolder s可能对 isLive 标志有不同的值。这使我得出结论,我不想切换整个地图,而只想切换地图中的每个列表。如果我使用 ConcurrentHashMap 。
一个 ConcurrentHashMap ,并且不切换地图,而是切换地图中的值,我可以摆脱 AtomicReference
要更改映射,我可以创建新列表并将其直接放入地图中。这是更高效的,因为我更快发布数据,并且创建更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性。
这是我的构建为简洁起见,省略了一些不太相关的部分)
public class SocketManager {
private static final Random random = new随机();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map< Datacenters,List< SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); //使用ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
私人SocketManager(){
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this :: updateLiveSockets,30,30,TimeUnit.SECONDS);
//在启动期间进行连接并填充一次
private void connectToZMQSockets(){
Map< Datacenters,List< String>> socketsByDatacenter = Utils.SERVERS; (Map.Entry< Datacenters,List< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder> addedColoSockets = connect(entry.getValue(),ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(),addedColoSockets); //我们可以直接放入地图
}
}
// ...
//这个方法将被多个线程来获得下一个活套接字
//是否有任何并发或线程安全问题或竞争条件?
public可选< SocketHolder> getNextSocket(){
for(Datacenters dc:Datacenters.getOrderedDatacenters()){
可选< SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); //不再需要本地副本ConcurrentHashMap,确保获得最新映射的List< SocketHolder>
if(liveSocket.isPresent()){
return liveSocket;
}
}
return Optional.absent();
}
//是否有任何并发或线程安全问题或竞争条件?
private可选< SocketHolder> getLiveSocket(final List< SocketHolder> listOfEndPoints){
if(!CollectionUtils.isEmpty(listOfEndPoints)){
//活套接字列表
List< SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for(SocketHolder obj:listOfEndPoints){
if(obj.isLive()){
liveOnly.add(obj);
if(!liveOnly.isEmpty()){
//列表不是空的,所以我们随机地将它返回第一个元素
return可选.of(liveOnly.get(random.nextInt(liveOnly.size()))); //只需选择一个
}
}
return Optional.absent();
}
//不需要进行同步
private void updateLiveSockets(){
Map< Datacenters,List< String>> socketsByDatacenter = Utils.SERVERS; (Map.Entry< Datacenters,List< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder>
。 liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List< SocketHolder> liveUpdatedSockets = new ArrayList<>(); (SocketHolder liveSocket:liveSockets)
{// LINE A
Socket socket = liveSocket.getSocket();
字符串endpoint = liveSocket.getEndpoint();
Map< byte [],byte []> holder = populateMap();
留言信息=新留言(持有者,Partition.COMMAND);
布尔状态= SendToSocket.getInstance()。execute(message.getAdd(),holder,socket);
布尔isLive =(状态)?真假;
SocketHolder zmq = new SocketHolder(socket,liveSocket.getContext(),endpoint,isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),Collections.unmodifiableList(liveUpdatedSockets)); //将它直接放入地图中,映射将以线程安全的方式更新。
}
}
}
I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.
public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); private final ZContext ctx = new ZContext(); // Lazy Loaded Singleton Pattern private static class Holder { private static final SocketManager instance = new SocketManager(); } public static SocketManager getInstance() { return Holder.instance; } private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { updateLiveSockets(); } }, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>(); for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); updatedLiveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); } // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter)); } private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { List<SocketHolder> socketList = new ArrayList<>(); for (String address : addresses) { try { Socket client = ctx.createSocket(socketType); // Set random identity to make tracing easier String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.setTCPKeepAlive(1); client.setSendTimeOut(7); client.setLinger(0); client.connect(address); SocketHolder zmq = new SocketHolder(client, ctx, address, true); socketList.add(zmq); } catch (Exception ex) { // log error } } return socketList; } // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter.get(); Optional<SocketHolder> liveSocket = Optional.absent(); List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); for (Datacenters dc : dcs) { liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { break; } } return liveSocket; } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) { if (!CollectionUtils.isEmpty(endpoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size()); for (SocketHolder obj : endpoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element Collections.shuffle(liveOnly); return Optional.of(liveOnly.get(0)); } } return Optional.absent(); } // Added the modifier synchronized to prevent concurrent modification // it is needed because to build the new map we first need to get the // old one so both must be done atomically to prevent concistency issues private synchronized void updateLiveSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter.get()); for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; // is there any problem the way I am using `SocketHolder` class? SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); } }
As you can see in my class:
- From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets in updateLiveSockets() method.
- And then from multiple threads, I call the getNextSocket() method to give me a live socket available which uses a liveSocketsByDatacenter map to get the required information.
I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.
I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?
Note: SocketHolder is an immutable class. And you can ignore ZeroMQ stuff I have.
You use the following synchronization techniques.
- The map with live socket data is behind an atomic reference, this allows safely switching the map.
- The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
- You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.
Is it thread safe, as it is now?
Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.
The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).
As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.
So yes, this class is thread safe, as it is.
However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.
Can it be made more efficient?
I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.
If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.
To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.
Here's my build (omitted some parts that were less relevant, for brevity)
public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map } } // ... // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // no need to make this synchronized private void updateLiveSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. } } }
这篇关于同时阅读地图,而单个后台线程会定期修改它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!