加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 并发配对

发布时间:2020-12-14 19:36:09 所属栏目:Java 来源:网络整理
导读:我正在寻找一种 java并发习惯用法来匹配具有最高吞吐量的大量元素. 考虑一下我有“人”来自多个线程.每个“人”都在寻找一场比赛.当它找到另一个等待的“人”时,它们相互匹配并被移除以进行处理. 我不想锁定一个大的结构来改变状态.考虑Person有getMatch和se
我正在寻找一种 java并发习惯用法来匹配具有最高吞吐量的大量元素.

考虑一下我有“人”来自多个线程.每个“人”都在寻找一场比赛.当它找到另一个等待的“人”时,它们相互匹配并被移除以进行处理.

我不想锁定一个大的结构来改变状态.考虑Person有getMatch和setMatch.在提交之前,每个人的#getMatch都是null.但是当他们解锁(或被捕获)时,他们要么已经过期,因为他们等待长时间的比赛或#getMatch是非空的.

保持高通过率的一些问题是,如果PersonA与PersonB同时提交.它们相互匹配,但PersonB也匹配已经在等待的PersonC. PersonB的状态在提交时变为“可用”.但是当PersonB与PersonC匹配时,PersonA不需要偶然获得PersonB.合理?另外,我想以异步方式执行此操作.换句话说,我不希望每个提交者都必须在具有waitForMatch类型的东西的线程上持有Person.

同样,我不希望请求必须在不同的线程上运行,但是如果有一个额外的匹配器线程也没关系.

似乎应该有一些成语,因为它似乎是一个非常普遍的事情.但我的谷歌搜索已经枯竭(我可能使用错误的条款).

UPDATE

有几件事让我很难解决这个问题.一个是我不想在内存中有对象,我想让所有等待的候选人都使用redis或memcache或类似的东西.另一个是任何人可能有几个可能的比赛.考虑如下界面:

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

然后我有一个看起来像这样的服务器:

MatchServer:
   submit( personId,expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

这是一个休息界面,它会使用重定向,直到你得到结果.我的第一个想法是在MatchServer中有一个Cache,它由redis之类的东西支持,并且对于当前被锁定和被操作的对象具有并发的弱值哈希映射.每个personId将由持久状态对象包装,状态为已提交,匹配和过期.

到目前为止?非常简单,提交代码完成了初始工作,它是这样的:

public void submit( Person p,long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(),expiration );
    if ( !tryMatch( incoming,p.getFriendIds() ) )
        cache.put( p.getId(),incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming,Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming,friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming,Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

所以这里的问题是,如果两个人同时进来并且他们是他们唯一的比赛,他们就不会找到对方.竞争条件对吗?我能看到解决它的唯一方法是同步“tryMatch()”.但这会影响我的吞吐量.我不能无限期地循环tryMatch,因为我需要这些非常短的调用.

那么有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会一次一个地强迫人们使用吞吐量.例如,创建后台线程并使用阻塞队列一次放入和接收传入线程.

任何指导将不胜感激.

解决方法

您可以使用ConcurrentHashMap.我假设你的对象有他们可以匹配的密钥,例如PersonA和PersonB将具有“Person”键.
ConcurrentHashMap<String,Match> map = new ConcurrentHashMap<>();

void addMatch(Match match) {
    boolean success = false;
    while(!success) {
        Match oldMatch = map.remove(match.key);
        if(oldMatch != null) {
            match.setMatch(oldMatch);
            success = true;
       } else if(map.putIfAbsent(match.key,match) == null) {
            success = true;
       }
   }
}

您将继续循环,直到您将匹配添加到地图,或者直到您删除现有匹配并将其配对为止. remove和putIfAbsent都是原子的.

编辑:因为您要将数据卸载到磁盘,您可以使用例如MongoDB到此,用其findAndModify方法.如果具有密钥的对象已经存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并且可能存储与新密钥关联的对;如果具有该键的对象不存在,则该命令将该对象与该键一起存储.这相当于ConcurrentHashMap的行为,除了数据存储在磁盘而不是内存中;您不必担心同时写入两个对象,因为findAndModify逻辑可以防止它们无意中占用相同的密钥.

如果需要将对象序列化为JSON,请使用Jackson.

还有Mongo的替代品,例如DynamoDB,虽然Dynamo只免费提供少量数据.

编辑:鉴于朋友列表不是自反性的,我认为你可以通过组合MongoDB(或其他键值数据库与原子更新)和ConcurrentHashMap来解决这个问题.

> MongoDB中的人员“匹配”或“无与伦比”. (如果我说“从MongoDB中删除一个人”,我的意思是“将该人的状态设置为’匹配’.”)
>添加新人时,首先创建一个ConcurrentHashMap< key,boolean>对于它,可能在全局ConcurrentHashMap< key,ConcurrentHashMap< key,boolean>>中.
>通过新人的朋友迭代:
>如果朋友在MongoDB中,那么使用findAndModify将其原子地设置为“匹配”,然后将新人写入状态为“匹配”的MongoDB,最后将该对添加到MongoDB中的“Pairs”集合中最终用户查询.从全局地图中删除此人的ConcurrentHashMap.
>如果朋友不在MongoDB中,那么检查该朋友是否已写入当前朋友的相关ConcurrentHashMap.它有,然后什么都不做;如果没有,则检查朋友是否有与之关联的ConcurrentHashMap;如果是,则将与当前人的关键字相关联的值设置为“true”. (请注意,由于当前人员无法检查自己的地图并使用一个原子操作修改朋友的地图,因此两个朋友仍然可以写入彼此的哈希地图,但是自我哈希地图检查会降低这种可能性.)
>如果此人尚未匹配,则将其以“不匹配”状态写入MongoDB,从全局映射中删除其ConcurrentHashMap,并创建一个延迟任务,该任务将遍历写入该文件的所有朋友的ID. person的ConcurrentHashMap(即使用ConcurrentHashMap#keySet()).此任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))),这样两个朋友就不会总是同时尝试匹配.如果当前的人没有任何需要重新检查的朋友,则不要为此创建延迟任务.
>当延迟结束时,为此人创建一个新的ConcurrentHashMap,从MongoDB中删除不匹配的人,然后循环回到步骤1.如果该人已经匹配,则不要将其从MongoDB中删除并终止延迟的任务.

在一般情况下,一个人要么与朋友匹配,要么在没有朋友加入系统的同时迭代通过朋友列表(即该人的ConcurrentHashMap将为空)时不匹配.如果同时写朋友:

Friend1和Friend2同时添加.

> Friend1写信给Friend2的ConcurrentHashMap,表示他们错过了对方.
> Friend2写入Friend1的ConcurrentHashMap来表示相同的情况(只有当Friend2在Friend1写入时同时检查Friend1是否写入其地图时才会出现这种情况 – 通常Friend2会检测到Friend1已写入其地图所以它不会写入Friend1的地图).
> Friend1和Friend2都写入MongoDB. Friend1在其后续任务中随机获得5秒延迟,Friend2随机获得15秒延迟.
> Friend1的任务首先触发,并与Friend2匹配.
> Friend2的任务激发第二; Friend2已不在MongoDB中,因此任务会立即终止.

一些打嗝:

> Friend1和Friend2可能都没有将ConcurrentHashMaps与它们相关联,例如:如果Friend2在Friend1检查以查看地图是否在内存中时仍在初始化其哈希映射.这很好,因为Friend2将写入Friend1的哈希映射,因此我们保证最终会尝试匹配 – 至少其中一个将具有哈希映射而另一个迭代,因为哈希映射创建在迭代之前.
>如果两个朋友的任务以某种方式同时解雇,则匹配的第二次迭代可能会失败.在这种情况下,如果朋友在匹配状态的MongoDB中,他们应该从其列表中删除朋友;然后,他们应该将结果列表的联合与写入其ConcurrentHashMap的朋友列表相关联,然后下一次迭代应该将其用作新朋友列表.最终该人将被匹配,否则该人的“重新检查”朋友列表将被清空.
>你应该增加每次后续迭代的任务延迟,以增加两个朋友的任务不会同时运行的可能性(例如Thread.sleep(500 * rand.nextInt(30))在第一次迭代,Thread.sleep (500 * rand.nextInt(60))在第二次迭代中,Thread.sleep(500 * rand.nextInt(90))在第三次迭代等).
>在后续迭代中,您必须在从MongoDB中删除人员之前创建一个人的新ConcurrentHashMap,否则您将进行数据竞争.同样,您必须在迭代其潜在匹配时从MongoDB中删除某个人,否则您可能会无意中将其匹配两次.

编辑:一些代码:

addUnmatchedToMongo(person1)方法将一个“不匹配”的person1写入MongoDB

setToMatched(friend1)使用findAndModify将friend1原子设置为“matched”;如果friend1已匹配或不存在,则该方法将返回false;如果更新成功,则返回true

如果friend1存在且匹配,则isMatched(friend1)返回true,如果它不存在或存在且“不匹配”,则返回false

private ConcurrentHashMap<String,ConcurrentHashMap<String,Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;

executor.execute(new Runnable() {
    public void run() {
        while(true) {
            Runnable runnable = delayQueue.take();
            executor.execute(runnable);
        }
    }
}

public static void findMatch(Person person,Collection<Person> friends) {
    findMatch(person,friends,1);
}

public static void findMatch(Person person,Collection<Person> friends,int delayMultiplier) {
    globalMap.put(person.id,new ConcurrentHashMap<String,Person>());
    for(Person friend : friends) {
        if(**setToMatched(friend)**) {
            // write person to MongoDB in "matched" state
            // write "Pair(person,friend)" to MongoDB so it can be queried by the end user
            globalMap.remove(person.id);
            return;
        } else {
            if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
                // the existence of "friendMap" indicates another thread is currently  trying to match the friend
                ConcurrentHashMap<String,Person> friendMap = globalMap.get(friend.id);
                if(friendMap != null) {
                    friendMap.put(person.id,person);
                }
            }
        }
    }
    **addUnmatchedToMongo(person)**;
    Collection<Person> retryFriends = globalMap.remove(person.id).values();
    if(retryFriends.size() > 0) {
        delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier),person,retryFriends,delayMultiplier));
    }
}

public class DelayedRetry implements Runnable,Delayed {
    private final long delay;
    private final Person person;
    private final Collection<Person> friends;
    private final int delayMultiplier;

    public DelayedRetry(long delay,Person person,delayMultiplier) {
        this.delay = delay;
        this.person = person;
        this.friends = friends;
        this.delayMultiplier = delayMultiplier;
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(delay,TimeUnit.MILLISECONDS);
    }

    public void run {
        findMatch(person,delayMultiplier + 1);
    }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读