RocketMQ 官网:https://rocketmq.apache.org/
基本概念
详看官方文档 基本概念 ,这里仅摘取部分内容做介绍。
消息模型 (Message Model)
RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。
Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。
消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群,但相互独立,没有信息交换。
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
安装部署
官网安装文档:https://rocketmq.apache.org/docs/quick-start/
下载地址:https://rocketmq.apache.org/dowloading/releases/
wget https://archive.apache.org/dist/rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
unzip rocketmq-all-4.9.0-bin-release.zip
cd rocketmq-all-4.9.0-bin-release
# 启动 Name Server
nohup sh bin/mqnamesrv &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
# 启动 Broker,-n 后表示要连接的 NameServer 的 IP 和端口,多个用分号隔开
nohup sh bin/mqbroker -n localhost:9876 &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
RocketMQ 默认需要的 JVM 内存比较大,可以修改 NameServer 和 Broker 的启动参数:
修改 NameServer 的启动参数,文件 bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
默认启动内存为 4G,根据自身机器修改参数。
修改 Broker 启动参数,文件 bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
查看 NameServer 启动日志:
[root@localhost rocketmq-all-4.9.0]# tail -f ~/logs/rocketmqlogs/namesrv.log
2021-11-06 17:54:27 INFO main - tls.client.keyPassword = null
2021-11-06 17:54:27 INFO main - tls.client.certPath = null
2021-11-06 17:54:27 INFO main - tls.client.authServer = false
2021-11-06 17:54:27 INFO main - tls.client.trustCertPath = null
2021-11-06 17:54:27 INFO main - Using JDK SSL provider
2021-11-06 17:54:28 INFO main - SSLContext created for server
2021-11-06 17:54:28 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2021-11-06 17:54:28 INFO NettyEventExecutor - NettyEventExecutor service started
2021-11-06 17:54:28 INFO main - The Name Server boot success. serializeType=JSON
2021-11-06 17:54:28 INFO FileWatchService - FileWatchService service starte
查看进程:
[root@localhost rocketmq-all-4.9.0]# jps
6824 Jps
5612 NamesrvStartup
1676 jenkins.war
同理,查看 Broker 启动日志操作也是这样,我就不再赘述了。
注意:以上启动方式为单机模式,生产环境请勿使用此模式,否则后果你懂的!
集群搭建请看:apache/rocketmq-运维管理
关闭服务:
# 关闭 Broker
sh bin/mqshutdown broker
# 关闭 Name Server
sh bin/mqshutdown namesrv
RocketMQ 控制台
项目地址:https://github.com/apache/rocketmq-externals
下载源码后执行命令打包并运行:
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.1.jar
浏览器访问:http://127.0.0.1:17890/#/,默认端口是 17890,如果自定义端口请修改访问端口。
Spring Boot 整合 RocketMQ
- Maven 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
- 添加 RocketMQ 配置
rocketmq:
# 多个用;隔开,如 172.19.0.1:9876;172.19.0.2:9876
name-server: 127.0.0.1:9876
producer:
group: hong-producer-group
# 添加自定义配置
hong:
rocketmq:
topic: string-topic
emailTopic: email-topic
transTopic: spring-transation-topic
- 添加属性映射类
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "hong.rocketmq")
public class CustomRocketMQProperties {
private String topic;
private String emailTopic;
private String transTopic;
}
- 添加 Producer
@Service
@Slf4j
public class RocketMQProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private CustomRocketMQProperties rocketMQProperties;
public void sendMessage(String msg) {
// 同步发送消息
rocketMQTemplate.convertAndSend(rocketMQProperties.getTopic(), msg);
// 异步发送消息
rocketMQTemplate.asyncSend(rocketMQProperties.getEmailTopic(), new EmailEvent("986310747@qq.com", "lanweihong@lanweihong.com", "Test email"), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送消息成功:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("异步发送消息异常:{}", throwable);
}
});
}
public void sendMessageInTransaction(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(rocketMQProperties.getTransTopic(), message, null);
}
}
EmailEvent
类:
@Data
@AllArgsConstructor
public class EmailEvent implements Serializable {
private static final long serialVersionUID = -9150152017178790573L;
private String to;
private String from;
private String content;
}
- 添加 Consumer
StringConsumer.java
@Service
@RocketMQMessageListener(topic = "${hong.rocketmq.topic}", consumerGroup = "string-consumer-group")
@Slf4j
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("StringConsumer 接收到消息:{}", message);
}
}
EmailConsumer.java
@Service
@RocketMQMessageListener(topic = "${hong.rocketmq.emailTopic}", consumerGroup = "email-consumer-group")
@Slf4j
public class EmailConsumer implements RocketMQListener<EmailEvent> {
@Override
public void onMessage(EmailEvent emailEvent) {
log.info("EmailConsumer 接收到消息:{}", emailEvent);
}
}
StringTransactionalConsumer.java
@Service
@RocketMQMessageListener(topic = "${hong.rocketmq.transTopic}", consumerGroup = "string-trans-consumer")
@Slf4j
public class StringTransactionalConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("StringTransactionalConsumer 接收到消息:{}", message);
}
}
- 添加 Controller 测试
@RestController
public class MainController {
private final RocketMQProducer rocketMQProducer;
@Autowired
public MainController(RocketMQProducer rocketMQProducer) {
this.rocketMQProducer = rocketMQProducer;
}
@PostMapping("/rocketmq")
public String testMessage(@RequestParam("msg")String msg) {
rocketMQProducer.sendMessage(msg);
return "操作成功";
}
@PostMapping("/rocketmq/transaction")
public String testTransactionMessage(@RequestParam("msg")String msg) {
rocketMQProducer.sendMessageInTransaction(msg);
return "操作成功";
}
}
错误
Windows 启动时可能会报错误:
错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_291\jre\lib\ext
这是因为 JDK 安装目录有空格,读取路径出错,修改 bin
目录下的 runserver.cmd
和 runbroker.cmd
,给 %JAVA_HOME%
添加半角双引号即可:
将
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib;%JAVA_HOME%\jre\lib\ext"
改为:
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib;"%JAVA_HOME%"\jre\lib\ext"
再次运行 mqnamesrv.cmd
,发现已成功运行:
参考文献
[1]. Quick Start - Apache RocketMQ
[2]. RocketMQ-Spring