一、mqtt协议简单介绍

mqtt是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。

二、rabbitmq的安装部署

1. 安装Erlang环境

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

yum -y install ncurses-devel

2. 下载Erlang rpm 安装包和rabbitmq rpm安装包

rpm包自取:https://pan.baidu.com/s/1UGuxeEIYMK9hBHKYBClfTQ 提取码:tmfm

RPM 下载包版本地址:https://packagecloud.io/rabbitmq/erlang

下载RabbitMQ rpm 安装包: https://github.com/rabbitmq/rabbitmq-server/releases/

*注意版本统一

安装erlang

rpm -Uvh erlang-23.2.7-1.el7.x86_64.rpm

yum install -y erlang

erl -v

安装rabbitmq

yum install -y socat

rpm -Uvh rabbitmq-server-3.9.15-1.el7.noarch.rpm

yum install -y rabbitmq-server

启动rabbitmq

systemctl start rabbitmq-server

查看rabbitmq状态

systemctl status rabbitmq-server

3、添加用户

添加root用户取代guest用户

rabbitmqctl add_user root root

rabbitmqctl set_user_tags root administrator

rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

删除guest

rabbitmqctl delete_user guest

创建普通用户并设置权限仅用于发送订阅消息 创建v-host

rabbitmqctl add_vhost /third_mqtt

创建用户

rabbitmqctl add_user third_client OP74X53Z

设置用户角色,无法登陆管理控制台,通常就是普通的生产者和消费者。

rabbitmqctl set_user_tags third_client none

设置用户在v-host下的权限

rabbitmqctl set_permissions -p /third_mqtt third_client ".*" ".*" ".*"

设置用户在默认"/” v-host下的权限

rabbitmqctl set_permissions -p / third_client ".*" ".*" ".*"

设置主题权限,可订阅和发布消息

rabbitmqctl set_topic_permissions -p /third_mqtt third_client amq.topic ".*" ".*"

三、启用 rabbitmq的mqtt协议和RabbitMQWeb管理界面

rabbitmq插件启用 启动RabbitMQWeb管理界面

rabbitmq-plugins enable rabbitmq_management

启用 rabbitmq的mqtt协议

rabbitmq-plugins enable rabbitmq_mqtt

启用 rabbitmq的web_mqtt协议(不使用js订阅发布可以不启用)

rabbitmq-plugins enable rabbitmq_web_mqtt

查看插件状态 E显式启用 e隐式启用

rabbitmq-plugins list

开放外网访问并重启防火墙

firewall-cmd --zone=public --add-port=5672/tcp --permanent

firewall-cmd --zone=public --add-port=15672/tcp --permanent

firewall-cmd --zone=public --add-port=1883/tcp --permanent

firewall-cmd --zone=public --add-port=15675/tcp --permanent

如果搭建rabbitmq集群模式需要把下面这个两个端口放开

firewall-cmd --zone=public --add-port=4369/tcp --permanent

firewall-cmd --zone=public --add-port=25672/tcp --permanent

重启防火墙

systemctl restart firewalld

firewall-cmd --zone=public --list-ports

以上部署操作全部设定完毕,重启rabbitmq服务,使用创建root用户登录进入rabbitmq控制台

*至此rabbitmq搭建mqtt安装部署结束,下面进入代码实现环节

四、代码实现

先在pom中添加依赖包

org.springframework.integration

spring-integration-mqtt

org.eclipse.paho

org.eclipse.paho.client.mqttv3

1.2.5

application.yml的所需的配置

mqtt:

#MQTT-用户名 root

username: third_client

#MQTT-密码,需要解密 root

password: OP74X53Z

#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://xxx.xxx.xx.xxx:1883,tcp://xxx.xxx.xxx.xxx:1883

hostUrl: tcp://192.168.2.128:1883,tcp://192.168.2.129:1883

#两个客户端的clientId不能相同,生产者和消费者的clientId不能相同

pubClientId: pub-client-id-al68pq1w-dev

subClientId: sub-client-id-9v83pp7c-dev

#发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定

pubTopic: defaultTopic

#订阅的主题

subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic

completionTimeout: 3000

mqtt服务类

package com.zdft.bhdcm.config.mtqq;

import lombok.extern.slf4j.Slf4j;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.IntegrationComponentScan;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.core.MessageProducer;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

import org.springframework.integration.mqtt.support.MqttHeaders;

import org.springframework.messaging.*;

import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;

/**

* mqtt服务类

* 一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,

* 是物联网(Internet of Thing)中的一个标准传输协议

* ClientId是MQTT客户端的标识。MQTT服务端用该标识来识别客户端。因此ClientId必须是独立的。

* clientID需为全局唯一。如果不同的设备使用相同的clientID同时连接物联网平台,那么先连接的那个设备会被强制断开。

*/

@Configuration

@IntegrationComponentScan

@Slf4j

public class MqttServerConfig {

@Value("${mqtt.username}")

private String username;

@Value("${mqtt.password}")

private String password;

@Value("${mqtt.hostUrl}")

private String hostUrl;

@Value("${mqtt.pubClientId}")

private String pubClientId;

@Value("${mqtt.subClientId}")

private String subClientId;

@Value("${mqtt.pubTopic}")

private String pubTopic;

@Value("${mqtt.subTopic}")

private String subTopic;

@Value("${mqtt.completionTimeout}")

private int completionTimeout;

/*========================================factory=================================*/

/**

* mqtt客户工厂

* @return

*/

@Bean

public MqttPahoClientFactory mqttClientFactory() {

DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

mqttConnectOptions.setUserName(username);

mqttConnectOptions.setPassword(password.toCharArray());

// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,

// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,

// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息

mqttConnectOptions.setCleanSession(false);

mqttConnectOptions.setServerURIs(hostUrl.split(","));

// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制

mqttConnectOptions.setKeepAliveInterval(20);

mqttConnectOptions.setMaxInflight(1000);

factory.setConnectionOptions(mqttConnectOptions);

return factory;

}

/*========================================sent=================================*/

/**

* mqtt出站通道

* @return

*/

@Bean

public MessageChannel mqttOutboundChannel() {

return new DirectChannel();

}

/**

* mqtt出站handler

*

* @return {@link MessageHandler}

*/

@Bean

@ServiceActivator(inputChannel = "mqttOutboundChannel")

public MessageHandler mqttOutboundHandler() {

//MqttPahoMessageHandler初始化

MqttPahoMessageHandler handler = new MqttPahoMessageHandler(pubClientId+"_send_handler_", mqttClientFactory());

//设置默认的qos级别

handler.setDefaultQos(1);

//保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用

handler.setDefaultRetained(false);

//设置发布的主题

handler.setDefaultTopic(pubTopic);

//当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。

handler.setAsync(false);

//当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。

handler.setAsyncEvents(false);

return handler;

}

/*========================================receive=================================*/

/**

* mqtt输入通道

* @return

*/

@Bean

public MessageChannel mqttInputChannel() {

return new DirectChannel();

}

/**

* 入站

* @return

*/

@Bean

public MessageProducer inbound() {

//配置订阅端MqttPahoMessageDrivenChannelAdapter

MqttPahoMessageDrivenChannelAdapter adapter =

new MqttPahoMessageDrivenChannelAdapter(subClientId+"_receive_inbound_", mqttClientFactory(), subTopic.split(","));

//设置超时时间

adapter.setCompletionTimeout(completionTimeout);

//设置默认的消息转换类

adapter.setConverter(new DefaultPahoMessageConverter());

//设置qos级别

adapter.setQos(1);

//设置入站管道

adapter.setOutputChannel(mqttInputChannel());

adapter.setTaskScheduler(new ConcurrentTaskScheduler());

return adapter;

}

/**

* 消息处理程序

* @return

*/

@Bean

@ServiceActivator(inputChannel = "mqttInputChannel")

public MessageHandler handler() {

return new MessageHandler() {

@Override

public void handleMessage(Message message) throws MessagingException {

MessageHeaders headers = message.getHeaders();

log.info("headers: {}", headers);

String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();

log.info("订阅主题为: {}", topic);

String[] topics = subTopic.split(",");

for (String t : topics) {

if (t.equals(topic)) {

log.info("订阅主题为:{};接收到该主题消息为:{}",topic,message.getPayload().toString());

}

}

}

};

}

}

mqtt网关(发布端需要用到)

package com.zdft.bhdcm.config.mtqq;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.integration.mqtt.support.MqttHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

/**

*mqtt网关(发布端需要用到)

*/

@Component

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")

public interface MqttGateway {

/**

* 发送到mqtt

*

* @param payload 有效载荷

*/

void sendToMqtt(String payload);

/**

* 发送到mqtt

*

* @param topic 主题

* @param payload 消息内容

*/

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

/**

* 发送到mqtt

*

* @param topic 主题

* @param qos qos

* @param payload 消息内容

*/

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

测试发送和订阅

五、mqtt.fx连接mqtt工具使用

mqtt测试工具安装包:https://pan.baidu.com/s/1oun7rMVJITOK9VSyO785HQ 提取码:l3cm

1、配置连接及订阅

配置mqtt连接

配置用户名密码 订阅gps-topic

六、jmeter压测结果展示

这里拟2w的消息并发量,根据业务计算最高模拟测试9w消息并发量没出现问题

如何使用移步到 :https://blog.csdn.net/weixin_39393393/article/details/116640867?spm=1001.2014.3001.5502

1、使用jmeter模拟2w并发量

2、结果展示

rabbitmq控制台展示

后台打印

mqtt订阅的消息

jmeter压测报告

好文阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。