rocketMq 学习及实践(一)

安装配置

下载

二进制下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.8.0/ docker 拉取地址:docker pull foxiswho/rocketmq:server-4.3.2

安装

配置环境变量 找到安装包,启动nameserver,进入 bin 目录双击 mqnamesrv.cmd 启动。启动 broker 命令 .\mqbroker -n 127.0.0.1:9876,指定配置开启自动创建 topic 启动 bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c conf/broker.conf & 可视化插件下载

下载地址:https://github.com/apache/rocketmq-externals.git 修改 rocketmq-externals\rocketmq-console\src\main\resources 下的 application.properties,修改 修改port和 namesrvAddr ,配置如下 server.contextPath=

# 修改端口号

server.port=8088

#spring.application.index=true

spring.application.name=rocketmq-console

spring.http.encoding.charset=UTF-8

spring.http.encoding.enabled=true

spring.http.encoding.force=true

logging.config=classpath:logback.xml

#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876

# 添加 rocketmq.config.namesrvAddr

rocketmq.config.namesrvAddr=127.0.0.1:9876

#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true

rocketmq.config.isVIPChannel=

#rocketmq-console's data path:dashboard/monitor

rocketmq.config.dataPath=/tmp/rocketmq-console/data

#set it false if you don't want use dashboard.default true

rocketmq.config.enableDashBoardCollect=true

编译启动,用CMD进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成。编译成功之后,cmd进入target文件夹,执行java -jar rocketmq-console-ng-1.0.0.jar,启动rocketmq-console-ng-1.0.0.jar。浏览器中输入‘127.0.0.1:8088’,成功后即可查看。 启动 nameserv start mqnamesrv.cmd 启动 broker start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

架构图

springboot 集成

官网:https://github.com/apache/rocketmq-spring

依赖

maven

org.apache.rocketmq

rocketmq-spring-boot-starter

2.2.0

gradle

compile 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.0'

配置

rocketmq:

name-server: 127.0.0.1:9876

producer:

group: leyang

# 超时时间 5 分钟

send-message-timeout: 300000

# 重试次数 3 次

retry-times-when-send-failed: 3

consumer:

group: leyang

生产者实现

rocketMQTemplate.send("test-topic-1", new GenericMessage(demo));

消费者实现

@Service

@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")

public class TestConsumer implements RocketMQListener {

@Override

public void onMessage(Demo demo) {

System.out.print("------- OrderPaidEventConsumer received:"+ JSON.toJSONString(demo));

}

}

3种消息发送方式

同步发送 sync

发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。示例如下

public void sync() {

rocketMQTemplate.syncSend("topic-name", "send sync message !");

}

异步发送sync

发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

public void async() {

rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

log.info("send successful");

}

@Override

public void onException(Throwable throwable) {

log.info("send fail; {}", throwable.getMessage());

}

});

}

直接发送 one-way

采用one-way发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

public void oneWay() {

rocketMQTemplate.sendOneWay("topic-name", "send one-way message");

}

两种消费模式

负载均衡消费 @Service

@RocketMQMessageListener(

topic = "topicName",

consumerGroup = "my-group1",

selectorExpression = "tag01||tag02",

// 负载均衡抹模式

messageModel = MessageModel.CLUSTERING

)

@Slf4j

public class Consumer2 implements RocketMQListener {

@Override

public void onMessage(Demo demo) {

log.debug("消息内容 {}", demo.toString());

}

}

广播模式 @Service

@RocketMQMessageListener(

topic = "topicName",

consumerGroup = "mall",

selectorExpression = "tag01||tag02",

// 广播模式

messageModel = MessageModel.BROADCASTING

)

@Slf4j

public class Consumer implements RocketMQListener {

@Override

public void onMessage(Demo demo) {

System.out.println("消息内容" + demo.toString());

}

}

顺序消费

为什么会有顺序消费呢,这个得先了解rocketmq的设计模型,rocketmq中nameserver 用于注册 borker,borker 中有 topic,用于区分消息类型,如果还想更细粒度区分,还有tag作为二级消息类型,一个topic中存在多个Queue。这样一来发送消息时消息就算放在同一个 topic中也可能不存在同一个队列上,所有消费者消费时自然是乱序的,想要保证消费顺序和生成者的发送顺序一致,唯一的做法就是将同一个业务组的消息放在同一队列中,队列有着先进先出的数据结构特点,所有只能在生成消息的时候指定同一组业务的消息在同一个队列中具体代码实现如下。

/**

* 这里使用的 springboot-rocketmq 封装的 syncSendOrderly(String destination, Object payload, String hashKey)

* 方法实现顺序消息,hashkey 就是用于选择同一队列的参数,一般传递 orderId 等唯一 id 即可。

* (生产者)

**/

@Test

public void orderMessage() {

SendResult result = rocketMQTemplate.syncSendOrderly("topicName:tag01", "刘德华", "1111");

System.out.println("1:" + result.toString());

SendResult result2 = rocketMQTemplate.syncSendOrderly("topicName:tag02", "张学友", "1111");

System.out.println("2:" +result2.toString());

SendResult result3 = rocketMQTemplate.syncSendOrderly("topicName:tag03", "李四-", "1111");

System.out.println("3:" + result3.toString());

SendResult result4 = rocketMQTemplate.syncSendOrderly("topicName:tag04", "王五", "1111");

System.out.println("4:" + result4.toString());

}

/**

* 消费者 consumeMode 选择 ORDERLY

**/

@Service

@RocketMQMessageListener(

topic = "topicName",

consumerGroup = "mall",

// 广播模式

messageModel = MessageModel.CLUSTERING,

// 选择顺序模式

consumeMode = ConsumeMode.ORDERLY

)

@Slf4j

public class Consumer implements RocketMQListener {

@Override

public void onMessage(String demo) {

System.out.println("消息内容" + demo);

}

}

延时队列

@Test

private void delayMessage() {

Demo demo = new Demo();

demo.setName("leyang");

demo.setSkill("coder");

Message message = new GenericMessage(demo);

// syncSend(String destination, Message message, long timeout, int delayLevel)

// delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 级别 1 及 1s,最大延时 2h

// timeout 超时时间跟延时时间没关系

SendResult result = rocketMQTemplate.syncSend("topicName:tag01", message, 2000, 1);

}

批量消息发送

注意事项:批量发送的数据量不能超过 4 m,超过后需要将消息分割。

@Test

public void batchMessage() {

List> messages = new ArrayList<>();

for (int i = 0; i < 10; i++) {

Demo demo = new Demo();

demo.setName("leyang" + (i + 1));

demo.setSkill("coder");

Message message = new GenericMessage(demo);

messages.add(message);

}

SendResult result = rocketMQTemplate.syncSend("topicName:tag01", messages);

}

消息过滤

根据tag过滤 // 生产者

@Test

public void orderMessage() {

// 通

SendResult result = rocketMQTemplate.syncSend("topicName:tag01", "刘德华");

System.out.println("1:" + result.toString());

SendResult result2 = rocketMQTemplate.syncSend("topicName:tag02", "张学友");

System.out.println("2:" +result2.toString());

SendResult result3 = rocketMQTemplate.syncSend("topicName:tag03", "李四-");

System.out.println("3:" + result3.toString());

SendResult result4 = rocketMQTemplate.syncSend("topicName:tag04", "王五");

System.out.println("4:" + result4.toString());

}

// 消费者

@Service

@RocketMQMessageListener(

topic = "topicName",

consumerGroup = "mall",

// 广播模式

messageModel = MessageModel.CLUSTERING,

consumeMode = ConsumeMode.ORDERLY,

// 选择 tag01、tag05消费

selectorExpression = "tag01 || tag05"

)

@Slf4j

public class Consumer implements RocketMQListener {

@Override

public void onMessage(String demo) {

System.out.println("消息内容" + demo);

}

}

根据自定义条件过滤

注意事项:

需要修改 broker.conf 配置文件,添加如下配置

# 开启属性过滤

enablePropertyFilter=true

然后启动的时候需要设置已配置文件方式启动,启动命令 start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf

不配置不会生效。

// 生成者

@Test

public void filterMessage() {

for (int i = 0; i < 10; i++) {

// 需要使用 rocketmq 原生的消息对象才能设置自定义属性

org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message();

Demo demo = new Demo();

demo.setName("leyang" + (i + 1));

demo.setSkill("coder");

rocketMsg.setBody(JSON.toJSONBytes(demo));

rocketMsg.putUserProperty("i", String.valueOf(i + 1));

rocketMsg.setTopic("topicName");

rocketMsg.setTags("tag01");

try {

// rocketMQTemplate.getProducer() 发送 org.apache.rocketmq.common.message 才生效

SendResult send = rocketMQTemplate.getProducer().send(rocketMsg);

} catch (MQClientException e) {

e.printStackTrace();

} catch (RemotingException e) {

e.printStackTrace();

} catch (MQBrokerException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

// 消费者

@Service

@RocketMQMessageListener(

topic = "topicName",

consumerGroup = "mall",

// 广播模式

messageModel = MessageModel.CLUSTERING

)

@Slf4j

public class Consumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener {

@Override

public void onMessage(Demo demo) {

System.out.println("消息内容" + demo);

}

@Override

public void prepareStart(DefaultMQPushConsumer consumer) {

// set consumer consume message from now

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));

try {

consumer.subscribe("topicName", MessageSelector.bySql("i > 5"));

} catch (MQClientException e) {

e.printStackTrace();

}

}

}

异常问题

【问题1】

异常信息:

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.93 CQ: 0.93 INDEX: 0.93], messages are put to the slave, message store has been shut down, etc. BROKER: 172.31.254.27:10911

解决方法:

编辑/conf/2m-2s-async/broker-a.properties文件,添加 diskMaxUsedSpaceRatio=98 磁盘占用到98%才会报错

参考博客

架构分析 https://www.cnblogs.com/qdhxhz/p/11094624.html 使用规范参考:https://www.jianshu.com/p/9c0d4bde8153 rocketmq 基础详解:https://blog.csdn.net/weixin_39615596/article/details/111611635 rocketmq 比较 kafka:https://blog.csdn.net/damacheng/article/details/42846549 docker 安装教程:https://zhuanlan.zhihu.com/p/342022297

查看原文