java – 并发配对
我正在寻找一种
java并发习惯用法来匹配具有最高吞吐量的大量元素.
考虑一下我有“人”来自多个线程.每个“人”都在寻找一场比赛.当它找到另一个等待的“人”时,它们相互匹配并被移除以进行处理. 我不想锁定一个大的结构来改变状态.考虑Person有getMatch和setMatch.在提交之前,每个人的#getMatch都是null.但是当他们解锁(或被捕获)时,他们要么已经过期,因为他们等待长时间的比赛或#getMatch是非空的. 保持高通过率的一些问题是,如果PersonA与PersonB同时提交.它们相互匹配,但PersonB也匹配已经在等待的PersonC. PersonB的状态在提交时变为“可用”.但是当PersonB与PersonC匹配时,PersonA不需要偶然获得PersonB.合理?另外,我想以异步方式执行此操作.换句话说,我不希望每个提交者都必须在具有waitForMatch类型的东西的线程上持有Person. 同样,我不希望请求必须在不同的线程上运行,但是如果有一个额外的匹配器线程也没关系. 似乎应该有一些成语,因为它似乎是一个非常普遍的事情.但我的谷歌搜索已经枯竭(我可能使用错误的条款). UPDATE 有几件事让我很难解决这个问题.一个是我不想在内存中有对象,我想让所有等待的候选人都使用redis或memcache或类似的东西.另一个是任何人可能有几个可能的比赛.考虑如下界面: person.getId(); // lets call this an Integer person.getFriendIds(); // a collection of other person ids 然后我有一个看起来像这样的服务器: MatchServer: submit( personId,expiration ) -> void // non-blocking returns immediately isDone( personId ) -> boolean // either expired or found a match getMatch( personId ) -> matchId // also non-blocking 这是一个休息界面,它会使用重定向,直到你得到结果.我的第一个想法是在MatchServer中有一个Cache,它由redis之类的东西支持,并且对于当前被锁定和被操作的对象具有并发的弱值哈希映射.每个personId将由持久状态对象包装,状态为已提交,匹配和过期. 到目前为止?非常简单,提交代码完成了初始工作,它是这样的: public void submit( Person p,long expiration ) { MatchStatus incoming = new MatchStatus( p.getId(),expiration ); if ( !tryMatch( incoming,p.getFriendIds() ) ) cache.put( p.getId(),incoming ); } public boolean isDone( Integer personId ) { MatchStatus status = cache.get( personId ); status.lock(); try { return status.isMatched() || status.isExpired(); } finally { status.unlock(); } } public boolean tryMatch( MatchStatus incoming,Iterable<Integer> friends ) { for ( Integer friend : friends ) { if ( match( incoming,friend ) ) return true; } return false; } private boolean match( MatchStatus incoming,Integer waitingId ) { CallStatus waiting = cache.get( waitingId ); if ( waiting == null ) return false; waiting.lock(); try { if ( waiting.isMatched() ) return false; waiting.setMatch( incoming.getId() ); incoming.setMatch( waiting.getId() ); return true } finally { waiting.unlock(); } } 所以这里的问题是,如果两个人同时进来并且他们是他们唯一的比赛,他们就不会找到对方.竞争条件对吗?我能看到解决它的唯一方法是同步“tryMatch()”.但这会影响我的吞吐量.我不能无限期地循环tryMatch,因为我需要这些非常短的调用. 那么有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会一次一个地强迫人们使用吞吐量.例如,创建后台线程并使用阻塞队列一次放入和接收传入线程. 任何指导将不胜感激. 解决方法
您可以使用ConcurrentHashMap.我假设你的对象有他们可以匹配的密钥,例如PersonA和PersonB将具有“Person”键.
ConcurrentHashMap<String,Match> map = new ConcurrentHashMap<>(); void addMatch(Match match) { boolean success = false; while(!success) { Match oldMatch = map.remove(match.key); if(oldMatch != null) { match.setMatch(oldMatch); success = true; } else if(map.putIfAbsent(match.key,match) == null) { success = true; } } } 您将继续循环,直到您将匹配添加到地图,或者直到您删除现有匹配并将其配对为止. remove和putIfAbsent都是原子的. 编辑:因为您要将数据卸载到磁盘,您可以使用例如MongoDB到此,用其findAndModify方法.如果具有密钥的对象已经存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并且可能存储与新密钥关联的对;如果具有该键的对象不存在,则该命令将该对象与该键一起存储.这相当于ConcurrentHashMap的行为,除了数据存储在磁盘而不是内存中;您不必担心同时写入两个对象,因为findAndModify逻辑可以防止它们无意中占用相同的密钥. 如果需要将对象序列化为JSON,请使用Jackson. 还有Mongo的替代品,例如DynamoDB,虽然Dynamo只免费提供少量数据. 编辑:鉴于朋友列表不是自反性的,我认为你可以通过组合MongoDB(或其他键值数据库与原子更新)和ConcurrentHashMap来解决这个问题. > MongoDB中的人员“匹配”或“无与伦比”. (如果我说“从MongoDB中删除一个人”,我的意思是“将该人的状态设置为’匹配’.”) 在一般情况下,一个人要么与朋友匹配,要么在没有朋友加入系统的同时迭代通过朋友列表(即该人的ConcurrentHashMap将为空)时不匹配.如果同时写朋友: Friend1和Friend2同时添加. > Friend1写信给Friend2的ConcurrentHashMap,表示他们错过了对方. 一些打嗝: > Friend1和Friend2可能都没有将ConcurrentHashMaps与它们相关联,例如:如果Friend2在Friend1检查以查看地图是否在内存中时仍在初始化其哈希映射.这很好,因为Friend2将写入Friend1的哈希映射,因此我们保证最终会尝试匹配 – 至少其中一个将具有哈希映射而另一个迭代,因为哈希映射创建在迭代之前. 编辑:一些代码: addUnmatchedToMongo(person1)方法将一个“不匹配”的person1写入MongoDB setToMatched(friend1)使用findAndModify将friend1原子设置为“matched”;如果friend1已匹配或不存在,则该方法将返回false;如果更新成功,则返回true 如果friend1存在且匹配,则isMatched(friend1)返回true,如果它不存在或存在且“不匹配”,则返回false private ConcurrentHashMap<String,ConcurrentHashMap<String,Person>> globalMap; private DelayQueue<DelayedRetry> delayQueue; private ThreadPoolExecutor executor; executor.execute(new Runnable() { public void run() { while(true) { Runnable runnable = delayQueue.take(); executor.execute(runnable); } } } public static void findMatch(Person person,Collection<Person> friends) { findMatch(person,friends,1); } public static void findMatch(Person person,Collection<Person> friends,int delayMultiplier) { globalMap.put(person.id,new ConcurrentHashMap<String,Person>()); for(Person friend : friends) { if(**setToMatched(friend)**) { // write person to MongoDB in "matched" state // write "Pair(person,friend)" to MongoDB so it can be queried by the end user globalMap.remove(person.id); return; } else { if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) { // the existence of "friendMap" indicates another thread is currently trying to match the friend ConcurrentHashMap<String,Person> friendMap = globalMap.get(friend.id); if(friendMap != null) { friendMap.put(person.id,person); } } } } **addUnmatchedToMongo(person)**; Collection<Person> retryFriends = globalMap.remove(person.id).values(); if(retryFriends.size() > 0) { delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier),person,retryFriends,delayMultiplier)); } } public class DelayedRetry implements Runnable,Delayed { private final long delay; private final Person person; private final Collection<Person> friends; private final int delayMultiplier; public DelayedRetry(long delay,Person person,delayMultiplier) { this.delay = delay; this.person = person; this.friends = friends; this.delayMultiplier = delayMultiplier; } public long getDelay(TimeUnit unit) { return unit.convert(delay,TimeUnit.MILLISECONDS); } public void run { findMatch(person,delayMultiplier + 1); } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |