java – 可以使用自定义算法调度消息,而不是使用RabbitMQ进行循
我正在使用RabbitMQ的循环功能来在多个消费者之间发送消息,但是一次只能收到一个消息.
我的问题是我的消息代表任务,我想在我的消费者身上有本地会话(状态).我事先知道哪些消息属于哪个会话,但是我不知道使用我指定的算法使RabbitMQ向消费者发送什么是最好的方法(或者有什么方法?). 我不想写我自己的编排服务,因为它将成为一个瓶颈,我不希望我的制作人知道哪个消费者会收到他们的消息,因为我会失去使用兔子的解耦. 有没有办法使RabbitMQ基于预定义的算法/规则而不是循环方式向消费者发送消息? 澄清:我用不同语言编写的几个微服务器,每个服务都有自己的工作.我们之间使用protobuf消息进行通信.我给每个新消息一个UUID.如果消费者收到消息,它可以从中创建一个响应消息(这可能不是正确的术语,因为生产者和消费者被解耦,并且他们彼此不了解),并且该UUID被复制到新消息.这形成一个数据转换流水线,该“进程”由UUID(processId)标识.我的问题是,有可能我有多个工作的消费者,如果以前看到它,我需要一个工作者坚持一个UUID.我有这个需要,因为 >每个进程可能有本地状态 由于RabbitMQ在使用循环的工作人员之间分配任务,因此我无法强制我的进程坚持工作.我有几个注意事项: 生产者与消费者脱钩,所以直接信息不是一个选择 如果有一个解决方案不涉及更改循环算法,并且不会破坏我的约束,那也是可以的! 解决方法
如果你不想去编排业务,你可以尝试这样的拓扑:
为了简单起见,我假设您的processId用作路由密钥(在现实世界中,您可能希望将其存储在标题中,并使用标题交换). 传入消息将被接收的Exchange(类型:直接)接受,其中 这是RabbitMQ文档在“替代交易”中所说的:
(我们对这里的用例特别感兴趣) 每个消费者将创建它自己的队列,并将其绑定到传入Exchange,使用到目前为止知道的会话的processId作为绑定的路由密钥. 这样一来,它只会收到有兴趣的会话的消息. 另外,所有的消费者都将绑定到共享的无会话队列. 如果有一个以前未知的processId的消息进来,那么它将不会对与Incoming Exchange进行注册的特定绑定,因此它将被重新路由到No Session Exchange =>没有会话队列,并以通常(循环)方式发送到其中一个消费者. 然后,消费者将使用传入交换(即启动新的“会话”)为其注册新的绑定,以便随后将使用此processId获取所有后续消息. 一旦“会话”结束,它将不得不删除相应的绑定(即关闭“会话”). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |