Java&RabbitMQ – 排队和多线程 – 或Couchbase作为作业队列
我有一个Job Distributor在不同的渠道上发布消息.
此外,我想要有两个(以及以后)消费者在不同的任务上工作并在不同的机器上运行. (目前我只有一个,需要扩大它) 我们来命名这些任务(只是例子): FIBONACCI(产生斐波那契数) 这些任务长达2-3个小时,应平均分配给每个消费者. 每个消费者都可以使用x个并行线程来处理这些任务. >机器1可以为FIBONACCI消耗3个并行作业,为RANDOMBOOKS消耗5个并行作业 我该如何实现? 我必须启动每个频道的X线程来听每个消费者? 我什么时候要确认? 我目前只针对一个消费者的方法是:为每个任务启动x线程 – 每个线程都是实现Runnable的Defaultconsumer.在handleDelivery方法中,我调用basicAck(deliveryTag,false)然后执行工作. 进一步:我想把一些任务送给一个特殊的消费者.如何结合上述公平分配实现? 这是我的出版代码 String QUEUE_NAME = "FIBONACCI"; Channel channel = this.clientManager.getRabbitMQConnection().createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,null); channel.basicPublish("",QUEUE_NAME,MessageProperties.BASIC,Control.getBytes(this.getArgument())); channel.close(); 这是我的消费者代码 public final class Worker extends DefaultConsumer implements Runnable { @Override public void run() { try { this.getChannel().queueDeclare(this.jobType.toString(),null); this.getChannel().basicConsume(this.jobType.toString(),this); this.getChannel().basicQos(1); } catch (IOException e) { // catch something } while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { Control.getLogger().error("Exception!",e); } } } @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] bytes) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); this.getChannel().basicAck(deliveryTag,false); // Is this right? // Start new Thread for this task with my own ExecutorService } } 在这种情况下,工作人员启动了两次:一次为FIBUNACCI,一次为RANDOMBOOKS UPDATE 正如答案所述,RabbitMQ不是最好的解决方案,但是Couchbase或MongoDB pull方法将是最好的.我对这些系统很新,有没有人可以向我解释,这将如何实现? 解决方法
这是一个概念性的观点,我将如何在couchbase上构建它.
>您有一些机器来处理作业,还有一些机器(可能是相同的)创建作业. 因此,总而言之,每个工作人员对孤立的作业进行查询,如果有任何检查,依次查看是否有一个锁定文件,则创建一个,并按照上述正常的锁定协议.如果没有孤立的作业,那么它会查找逾期的作业,并遵循锁定协议.如果没有过期的工作,那么它只需要最旧的工作并遵循锁定协议. 当然,如果您的系统没有“逾期”这样的事情,并且如果及时性并不重要,那么这样做也可以,而不是使用最旧的工作,您可以使用其他方法. 另一种方法可能是创建1-N之间的随机值,其中N是相当大的数字,例如4×工作人员的数量,并且每个作业都被标记为该值.每次工作人员要找工作时,都可以滚动骰子,看看是否有任何这样的工作.如果没有,它会再次这样做,直到找到一个有该号码的工作.这样,而不是多个争夺少数“最旧”或最高优先级的工作的工人,而不是更多的锁争用的可能性,它们将被分散出来.以牺牲时间为代价比FIFO情况更随机. 随机方法也可以应用于需要容纳负载值的情况下(因为单个机器不承担太多的负载),而不是采用最老的候选者,只需要随机选择一个列出可行的工作,并尝试这样做. 编辑添加: 在第12步中,我说“可能会放置一个随机数”,我的意思是,如果工作人员知道优先级(例如:最需要做哪些工作),可以将一个代表这个的数字放入文件中.如果没有“需要”工作的概念,那么他们都可以滚动骰子.他们用骰子的角色来更新这个文件.然后他们都可以看看它,看看其他的滚动.如果他们失去了,那么他们会平息,另一个工作人员知道它有它.这样,您可以解决哪位工作人员无需复杂的协议或协商工作.我假设两个工作人员都在这里碰到相同的锁定文件,它可以使用两个锁定文件和一个查找所有这些文件来实现.如果经过一段时间后,没有一个工人滚过一个更高的数字(新工人想到他的工作会知道别人已经在滚动,所以他们会跳过它)你可以安全地认识你是唯一的工作人员努力工作 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- java – 使用ehcache集群的hibernate缓存:nonst
- Java框架搭建之Maven、Mybatis、Spring MVC整合搭
- Java中继承thread类与实现Runnable接口的比较
- android 利用socket 发送Json数据demo
- MyBatis获取插入记录的自增长字段值(ID)
- Spring Boot整合Elasticsearch实现全文搜索引擎案
- Spring Security Remember me使用及原理详解
- java – 通过单击位置从JList获取组件
- struts2静态资源映射代码示例
- Java (JVM) Memory Model – Memory Management