RocketMQ在linux下安装部署
发布时间:2020-12-14 00:40:47 所属栏目:Linux 来源:网络整理
导读:本博客以当前RocketMQ最新版介绍:v4.4.0 环境要求 64位JDK 1.8+; Maven 3.2.x; // 源码编译时需要用到 二进制文件安装 下载二进制文件:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip 二进制文件放到任意
|
本博客以当前RocketMQ最新版介绍:v4.4.0 环境要求
二进制文件安装
> unzip rocketmq-all-4.4.0-bin-release.zip && mv rocketmq-all-4.4.0-bin-release rocketmq
> cd /root/rocketmq > nohup sh bin/mqnamesrv & // 第一次安装时,可执行sh bin/mqnamesrv观察是否能够启动 > tailf -f ~/logs/rocketmqlogs/namesrv.log // 观察到以下日志时,server已启动成功 2019-09-07 18:06:13 INFO main - The Name Server boot success. serializeType=JSON
> nohup sh bin/mqbroker -n localhost:9876 > tailf -f ~/logs/rocketmqlogs/broker.log // 观察到以下日志时,server已启动成功 2019-09-07 20:40:06 INFO main - The broker[0daf9bd41237,172.17.0.2:10911] boot success. serializeType=JSON and name server is 172.17.0.2:9876
// 在测试之前,我们需要先设置环境变量:export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK,msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt... Producer的源码 public class Producer {
public static void main(String[] args) throws MQClientException,InterruptedException {
// 默认消费组
DefaultMQProducer producer = new DefaultMQProducer("default");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 消息发送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 关闭生产者
producer.shutdown();
}
}
Consumer的源码 public class Consumer {
public static void main(String[] args) throws InterruptedException,MQClientException {
// 指定消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default");
// 设置消费偏移点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅topic,以及tag
consumer.subscribe("TopicTest","*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),msgs);
// 收到数据后,返回ack确认
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
> sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK > sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
