我正在寻找一个Java并发习惯用法来将匹配项与具有最高吞吐量的大量元素配对。
考虑一下我有来自多个线程的“人”。每个“人”都在寻找匹配项。当它找到另一个正在等待的“人”时,与他们匹配的人都被分配给对方,并被删除以进行处理。
我不想锁定一个大结构来更改状态。考虑Person具有getMatch和setMatch。在提交每个人之前,每个人的#getMatch为null。但是,当他们解除阻止(或被捕鱼)时,它们要么已过期,因为他们等待了很长时间才能找到匹配项,或者#getMatch不为null。
保持高吞吐量的一些问题是,如果PersonA与PersonB同时提交。他们彼此匹配,但是PersonB也匹配已经在等待的PersonC。提交后,PersonB的状态更改为“可用”。但是,当PersonB与PersonC匹配时,PersonA不必意外地获得PersonB。有道理?另外,我想以一种异步工作的方式来执行此操作。换句话说,我不希望每个提交者都必须在具有waitForMatch类型的事物的线程上坚持使用Person。
同样,我不希望请求必须在单独的线程上运行,但是如果有一个附加的匹配器线程也可以。
似乎应该对此有一些习语,因为这似乎很常见。但是我的Google搜索变得干dry了(我可能使用了错误的术语)。
更新
有几件事使我很难解决这个问题。一种是我不想在内存中有对象,我想让所有等待的候选对象都放在redis或memcache或类似的东西中。另一个是任何人都可能有多个匹配项。考虑如下接口(interface):
person.getId(); // lets call this an Integer
person.getFriendIds(); // a collection of other person ids
然后,我有一台服务器,看起来像这样:
MatchServer:
submit( personId, expiration ) -> void // non-blocking returns immediately
isDone( personId ) -> boolean // either expired or found a match
getMatch( personId ) -> matchId // also non-blocking
这是用于rest接口(interface)的,它将使用重定向,直到获得结果为止。我的第一个想法是在MatchServer中仅拥有一个缓存,该缓存由redis之类的东西支持,并且对于当前已锁定并正在执行操作的对象具有并发的弱值哈希映射。每个personId将由一个持久状态对象包装,该对象具有已提交,已匹配和已过期等状态。
至今为止?很简单,提交代码完成了最初的工作,就像这样:
public void submit( Person p, long expiration ) {
MatchStatus incoming = new MatchStatus( p.getId(), expiration );
if ( !tryMatch( incoming, p.getFriendIds() ) )
cache.put( p.getId(), incoming );
}
public boolean isDone( Integer personId ) {
MatchStatus status = cache.get( personId );
status.lock();
try {
return status.isMatched() || status.isExpired();
} finally {
status.unlock();
}
}
public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
for ( Integer friend : friends ) {
if ( match( incoming, friend ) )
return true;
}
return false;
}
private boolean match( MatchStatus incoming, Integer waitingId ) {
CallStatus waiting = cache.get( waitingId );
if ( waiting == null )
return false;
waiting.lock();
try {
if ( waiting.isMatched() )
return false;
waiting.setMatch( incoming.getId() );
incoming.setMatch( waiting.getId() );
return true
} finally {
waiting.unlock();
}
}
因此,这里的问题是,如果两个人同时进入,并且他们是他们唯一的比赛,他们将不会找到对方。比赛条件好吗?我唯一能解决的方法是同步“tryMatch()”。但这扼杀了我的吞吐量。我不能无限期地尝试tryMatch,因为我需要这些调用非常短。
那么,解决这个问题的更好方法是什么呢?我提出的每种解决方案一次都会迫使人们合而为一,这对于吞吐率而言并不理想。例如,创建一个后台线程并使用阻塞队列来一次放置和接收传入线程。
任何指导将不胜感激。
最佳答案
您也许可以使用ConcurrentHashMap
。我假设您的对象具有可以匹配的键,例如PersonA和PersonB将具有一个“Person”键。
ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();
void addMatch(Match match) {
boolean success = false;
while(!success) {
Match oldMatch = map.remove(match.key);
if(oldMatch != null) {
match.setMatch(oldMatch);
success = true;
} else if(map.putIfAbsent(match.key, match) == null) {
success = true;
}
}
}
您将一直循环播放,直到将匹配项添加到 map 上,或者删除了现有匹配项并对其进行了配对为止。
remove
和putIfAbsent
都是原子的。编辑:因为您要将数据卸载到磁盘上,因此可以使用例如为此,使用了MongoDB方法。如果具有键的对象已经存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并大概存储与新键关联的配对;如果不存在带有 key 的对象,则命令将存储带有 key 的对象。这与
ConcurrentHashMap
的行为相同,只不过数据存储在磁盘上而不是内存中。您无需担心两个对象同时写入,因为findAndModify
逻辑可防止它们无意间占用相同的键。如果需要将对象序列化为JSON,请使用findAndModify。
有Mongo的替代品,例如Jackson,尽管Dynamo仅对少量数据免费。
编辑:鉴于好友列表不是自反的,我认为您可以结合使用MongoDB(或具有原子更新的另一个键值数据库)和
ConcurrentHashMap
来解决此问题。ConcurrentHashMap<key, boolean>
,可能是在全局ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>
中。 findAndModify
将其原子地设置为“matched”,然后将新用户以“matched”状态写入MongoDB,最后将该对添加到MongoDB中的“Pairs”集合中,该配对可以由最终用户查询。从全局 map 中删除此人的ConcurrentHashMap
。 ConcurrentHashMap
。它有,那么什么也不做;如果还没有,则检查 friend 是否有与之关联的ConcurrentHashMap
;如果是,则将与当前人的键关联的值设置为“true”。 (请注意,由于当前用户无法通过一次原子操作检查自己的 map 并修改该 friend 的 map ,因此两个 friend 仍然有可能写入彼此的哈希 map ,但是自我哈希 map 检查会减少这种可能性。) ConcurrentHashMap
,并创建一个延迟的任务,该任务将遍历所有写给该好友的 friend 的ID。人的ConcurrentHashMap
(即使用ConcurrentHashMap#keySet()
)。此任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))
),这样两个 friend 就不会总是尝试同时进行匹配。如果当前人没有需要重新检查的任何 friend ,则不要为此创建延迟的任务。 在常见情况下,某人要么与 friend 匹配,要么在迭代 friend 列表时没有将 friend 添加到系统中而无法匹配(即,该人的
ConcurrentHashMap
将为空)。如果同时发给 friend :Friend1和Friend2同时添加。
ConcurrentHashMap
,以表明他们彼此错过了。 ConcurrentHashMap
来指示相同(只有在Friend2向Friend1写入 map 的同时检查Friend1写入 map 的情况下,这种情况才会发生-通常Friend2会检测到Friend1已写入其 map 因此它不会写入Friend1的 map )。 一些打h:
ConcurrentHashMaps
,例如如果Friend2在Friend1检查以查看该映射是否在内存中时仍在初始化其哈希映射。很好,因为Friend2会写入Friend1的哈希映射,因此我们保证最终会尝试匹配-至少其中一个将具有哈希映射,而另一个将迭代,因为哈希映射创建是在迭代之前进行的。 ConcurrentHashMap
的 friend 列表合并,然后下一次迭代应将此列表用作新的 friend 列表。最终,该人将被匹配,否则该人的“重新检查” friend 列表将被清空。 Thread.sleep(500 * rand.nextInt(30))
,第二次迭代中的Thread.sleep(500 * rand.nextInt(60))
,第三次迭代中的Thread.sleep(500 * rand.nextInt(90))
等)。 ConcurrentHashMap
,然后再将其从MongoDB中删除,否则将导致数据争用。同样,在迭代某个人的潜在匹配项时,必须将其从MongoDB中删除,否则可能会无意中将其匹配两次。 编辑:一些代码:
方法
addUnmatchedToMongo(person1)
将一个“不匹配”的person1写入MongoDBsetToMatched(friend1)
使用findAndModify
原子地将friend1
设置为“matched”;如果friend1
已经匹配或不存在,则该方法将返回false;如果更新成功,则该方法将返回true。isMatched(friend1)
如果friend1
存在且已匹配,则返回true;如果不存在或存在且未匹配,则返回false。private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;
executor.execute(new Runnable() {
public void run() {
while(true) {
Runnable runnable = delayQueue.take();
executor.execute(runnable);
}
}
}
public static void findMatch(Person person, Collection<Person> friends) {
findMatch(person, friends, 1);
}
public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
for(Person friend : friends) {
if(**setToMatched(friend)**) {
// write person to MongoDB in "matched" state
// write "Pair(person, friend)" to MongoDB so it can be queried by the end user
globalMap.remove(person.id);
return;
} else {
if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
// the existence of "friendMap" indicates another thread is currently trying to match the friend
ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
if(friendMap != null) {
friendMap.put(person.id, person);
}
}
}
}
**addUnmatchedToMongo(person)**;
Collection<Person> retryFriends = globalMap.remove(person.id).values();
if(retryFriends.size() > 0) {
delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
}
}
public class DelayedRetry implements Runnable, Delayed {
private final long delay;
private final Person person;
private final Collection<Person> friends;
private final int delayMultiplier;
public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
this.delay = delay;
this.person = person;
this.friends = friends;
this.delayMultiplier = delayMultiplier;
}
public long getDelay(TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
public void run {
findMatch(person, friends, delayMultiplier + 1);
}
}
关于java - 同时配对比赛,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16468958/