如何在同一个Java进程中连接多个RocketMQ服务器
前言
我们都知道,RocketMQ在代码级别对连接服务器进行了限制,基本上可以理解为一个JVM进程中只能连接一个NameServer,但实际应用场景中,我们可能会在架构设计层面上对RocketMQ进行了职能上的划分,规定了A服务处理A类消息,而B服务处理B类消息,这时我们应该如何解决这个问题呢? 问题的根源我们从代码层级来分析到底为什么会产生“一个JVM实例只能连接一个NameServer”。 public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig,RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(),clientId,rpcHook); MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId,instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]",clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]",clientId); } } return instance; } 代码不复杂,我们可以看到它利用客户的配置信息生成一个固定的clientId,以此去缓存factoryTable中查找,不存在才会创建全新一个实例。 public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); } 代码层面上对clientId进行了约定,格式为“[email?protected]”格式,当unitName不为空的时候还会在后面加上“@unitName”。 怎么解决?从代码分析上我们可以知道,为了创建多实例,我们可以
除此之外还有其它解决方案吗?我仔细从网络上翻了一轮,没看到什么好方法,是大家都没这个场景还是有其它好办法解决了呢?欢迎大家讨论~ 方法3在上一篇博文来自平行世界的救赎里面,我做了个工具sandbox,我提供的方法3就是依托于这个工具。 怎么做?这里我使用一个springboot项目作为演示案例。 我们先从pom文件中引入包(我没有推上maven仓库,各位可以从github/gitee上下载),代码如下 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.van</groupId> <artifactId>rocket-mq-multi-client-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>测试多个rocketmq client共存</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <start-class>me.van.App</start-class> <java.version>1.8</java.version> <lombok.version>1.14.8</lombok.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>me.van</groupId> <artifactId>sandbox</artifactId> <version>1.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> 此处引入了apache的rocketmq-client组件作为mq客户端,也就是存在前面所说的问题的组件。 启动类package me.van; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); } } 非常的简单,没什么好介绍的。 配置类package me.van; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.springframework.beans.factory.annotation.Autowire; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { @Bean(autowire = Autowire.BY_NAME,value = "producer") MQProducer producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(); initProducer(producer,"a.io:9876;b.io:9876"); return producer; } @Bean(autowire = Autowire.BY_NAME,value = "producer_sandbox1") MQProducer producerSandbox1() throws MQClientException,SandboxCannotCreateObjectException { DefaultMQProducer producer = createProducerInSandbox(); initProducer(producer,"x.io:9876;y.io:9876"); return producer; } @Bean(autowire = Autowire.BY_NAME,value = "producer_sandbox2") MQProducer producerSandbox2() throws MQClientException,"1.io:9876;2.io:9876"); return producer; } private DefaultMQProducer createProducerInSandbox() throws SandboxCannotCreateObjectException { Sandbox sandbox = new Sandbox("org.apache.rocketmq.client"); return sandbox.createObject(DefaultMQProducer.class); } private void initProducer(DefaultMQProducer producer,String namesrvAddr) throws MQClientException { producer.setNamesrvAddr(namesrvAddr); producer.setProducerGroup("test-group"); producer.setRetryAnotherBrokerWhenNotStoreOK(true); producer.start(); } } 这里可以看到, Controllerpackage me.van; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired MQProducer producer; @Autowired MQProducer producer_sandbox1; @Autowired MQProducer producer_sandbox2; @GetMapping("/") public String hello(){ return "hello world"; } @GetMapping("/send") public String send(String msg){ if(null == msg) return "msg is null"; String returnMsg = ""; Message message = new Message("topic-test-multi-mq-client",msg.getBytes()); try { producer.send(message); returnMsg += "原生producer发送完成<br/>"; producer_sandbox1.send(message); returnMsg += "第一个沙箱内producer发送完成<br/>"; producer_sandbox2.send(message); returnMsg += "第二个沙箱内producer发送完成<br/>"; } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { returnMsg += "发送过程出现异常:" + e.getMessage(); } return returnMsg; } } 通过 测试一下运行 msg is null 访问,http://localhost:8080/send?msg=test 代码地址github: https://github.com/vancoo/multi-mq-demo (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |