1、准备开发板

开发板功能区分布图

开发板俯视图

2、实验讲解

    在之前的章节中,已经讲解过了MQTT的通讯原理和组包过程,现在开始手把手的教大家用代码来实现连接MQTT平台以及数据的交互,实际上这篇文章已经拖更接近两年了,非常感谢那些默默支持我的朋友们。代码的实现过程主要参考了MQTT中文网[https://mqtt.p2hp.com/mqtt311],大家感兴趣的话可以打开进行学习,你们的点赞和支持是我持续更新的动力。

    大家不要被我的代码量给吓到了,下面的代码无非是封装好的AT指令,在下面我也会贴出AT指令实现的过程帮助大家理解,另外我也会贴出逆向后的报文,让大家知道在明文情况下,黑客抓包会获得哪些数据。那些复杂指令集和实现过程也是通过一些重复且简单的指令组成的,希望你能够保持你的自信心,抱着良好的心态去对待生活。

提前准备的数据(Ready)

服务器地址 broker-cn.emqx.io

服务器端口 1883

WIFI名称 MCSCV

WIFI密码 jnszstmOBB

//鉴权三元组

客户端ID 111|hmvcfu

用户名 hmvcfu

密码 a02d6361531c2dae941d5b022982d944

MQTT上报主题 sys/hmvcfu/post

MQTT下发主题 sys/hmvcfu/control

初始化并连接服务器(Init)

//设置STA模式

ESP8266> AT+CWMODE=1

//设置单连接模式

ESP8266> AT+CIPMUX=0

//连接路由器WIFI

ESP8266> AT+CWJAP="MCSCV","jnszstmOBB"

//连接服务器

ESP8266> AT+CIPSTART="TCP","broker-cn.emqx.io",1883

发送MQTT连接报文(Connect)

//发送连接报文

ESP8266> AT+CIPSEND=66

ESP8266> 10 40 00 04 4D 51 54 54 04 C0 00 78 00 0A 31 31 31 7C 68 6D 76 63 66 75 00 06 68 6D 76 63 66 75 00 20 61 30 32 64 36 33 36 31 35 33 31 63 32 64 61 65 39 34 31 64 35 62 30 32 32 39 38 32 64 39 34 34

//逆向后的连接报文

◇@\0MQTT繺0x\0

111|hmvcfu\0hmvcfu\0 a02d6361531c2dae941d5b022982d944□

//发送后回应

ESP8266< +IPD,5: 20 02 00 00

发送MQTT订阅主题报文(Subscribe)

//发送订阅主题报文

ESP8266> AT+CIPSEND=25

ESP8266> 82 17 00 01 00 12 73 79 73 2F 68 6D 76 63 66 75 2F 63 6F 6E 74 72 6F 6C 00

//逆向后的订阅主题报文

?\0\0sys/hmvcfu/control\0□

//发送后回应

ESP8266< +IPD,5:90 03 00 01 00

发送MQTT发布主题报文(Publish)

//发送发布主题报文

ESP8266> AT+CIPSEND=64

ESP8266> 30 3E 00 0F 73 79 73 2F 68 6D 76 63 66 75 2F 70 6F 73 74 7B 22 6D 73 67 22 3A 7B 22 70 61 72 61 6D 64 61 74 61 22 3A 5B 7B 22 74 65 6D 70 22 3A 32 30 2C 22 68 75 6D 69 22 3A 39 38 7D 5D 7D 7D

//逆向后的发布主题报文

0>\0sys/hmvcfu/post{"msg":{"paramdata":[{"temp":20,"humi":98}]}}□

//接收到SEND OK说明发布成功

ESP8266< SEND OK

接收来自MQTT发布者发布的报文(Receive)

//接收到openLed指令

ESP8266< +IPD,29:30 1B 00 12 73 79 73 2F 68 6D 76 63 66 75 2F 63 6F 6E 74 72 6F 6C 6F 70 65 6E 4C 65 64

//逆向后的接收指令

0\0sys/hmvcfu/controlopenLed□

//接收到CloseLed指令

ESP8266< +IPD,30:30 1C 00 12 73 79 73 2F 68 6D 76 63 66 75 2F 63 6F 6E 74 72 6F 6C 63 6C 6F 73 65 4C 65 64

//逆向后的接收指令

0\0sys/hmvcfu/controlcloseLed□

3、在MDK中编写代码

MQTT组包函数MQTT_PacketConnect连接服务器的组包函数MQTT_UnPacketConnectAck连接回复消息解包MQTT_PacketPublishMQTT发布消息组包函数MQTT_UnPacketPublishAckMQTT发布回复消息解包函数MQTT_UnPacketPublishMQTT接收发布消息解包函数MQTT_PacketSubscribeMQTT订阅消息组包函数MQTT_UnPacketSubscribeMQTT订阅回复消息解包函数MQTT_UnPacketUnSubscribeMQTT取消订阅回复消息解包函数MQTT_PacketUnSubscribeMQTT取消订阅消息组包函数MQTT_PacketDisConnect断开连接消息组包MQTT_PacketPingMQTT心跳请求组包函数MQTT_UnPacketRecvMQTT接收消息解包函数

MQTT基础函数MQTTClient_DevLinkMQTT客户端连接函数MQTTClient_SubscribeMQTT客户端订阅函数MQTTClient_PublishMQTT客户端发布函数MQTTClient_RevProMQTT客户端协议接收函数MQTTClient_HeartBeatMQTT客户端心跳函数

新建GloalConfig.h文件,添加以下代码

#ifndef __GLOAL_CONFIG_H_

#define __GLOAL_CONFIG_H_

#define SERVER_IP "broker-cn.emqx.io" //服务器地址

#define SERVER_PORT "1883"//服务器端口

#define WIFI_NAME "MCSCV" //WIFI名称

#define WIFI_PSWD "jnszstmOBB" //WIFI密码

//鉴权三元组

#define MQTT_CLIENTID "111|hmvcfu" //客户端ID

#define MQTT_USERNAME "hmvcfu" //用户名

#define MQTT_PASSWORD "a02d6361531c2dae941d5b022982d944" //密码

#define MQTT_PUB_TOPIC "sys/hmvcfu/post" //MQTT上报主题

#define MQTT_SUB_TOPIC "sys/hmvcfu/control" //MQTT下发主题

#endif

新建shfm_mqtt.c文件,添加以下代码

#include "shfm_mqtt.h"

/*

*@brief 连接服务器的组包函数

*@param Username 用户名

* Password 密码

* ClientID 客户端ID

* mqttPacket 数据包指针

*@return 数据包长度

*/

uint16_t MQTT_PacketConnect(char *Username,char *Password,char *ClientID,uint8_t *mqttPacket)

{

int16_t len;

int16_t ClientIDLen;

int16_t UsernameLen;

int16_t PasswordLen;

uint16_t mqttPacketLen = 0;

ClientIDLen = strlen(ClientID);

UsernameLen = strlen(Username);

PasswordLen = strlen(Password);

//可变报头(10)+Payload 每个字段包含两个字节的长度标识

len = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2); //加2是因为每个字段占两个字节

//固定报头

//控制报文类型

mqttPacket[mqttPacketLen++] = 0x10; //MQTT Message Type CONNECT

//剩余长度(不包括固定头部),剩余长度最多可以用四个字节来表示

do

{

uint8_t encodedByte = len % 128;

len = len / 128;

// 如果有更多的数据要编码,请设置该字节的最高位

if ( len > 0 )

encodedByte = encodedByte | 128;

mqttPacket[mqttPacketLen++] = encodedByte;

} while ( len > 0 );

//可变报头

//协议名

mqttPacket[mqttPacketLen++] = 0; // 协议名长度高位

mqttPacket[mqttPacketLen++] = 4; // 协议名长度低位

mqttPacket[mqttPacketLen++] = 'M'; // ASCII 字符 M

mqttPacket[mqttPacketLen++] = 'Q'; // ASCII 字符 Q

mqttPacket[mqttPacketLen++] = 'T'; // ASCII 字符 T

mqttPacket[mqttPacketLen++] = 'T'; // ASCII 字符 T

//协议级别

mqttPacket[mqttPacketLen++] = 4; // MQTT协议版本号 4

//连接标志

mqttPacket[mqttPacketLen++] = MQTT_CONNECT_WILL_QOS0|MQTT_CONNECT_USER_NAME|MQTT_CONNECT_PASSORD; // 连接标志

mqttPacket[mqttPacketLen++] = WORD_MSB(MQTT_KEEP_LIVE_TIME); // 保活时间长度高位

mqttPacket[mqttPacketLen++] = WORD_LSB(MQTT_KEEP_LIVE_TIME); // 保活时间长度低位

mqttPacket[mqttPacketLen++] = WORD_MSB(ClientIDLen);// 客户端ID长度高位

mqttPacket[mqttPacketLen++] = WORD_LSB(ClientIDLen);// 客户端ID长度低位

memcpy(&mqttPacket[mqttPacketLen],ClientID,ClientIDLen);

mqttPacketLen += ClientIDLen;

if(UsernameLen > 0)

{

mqttPacket[mqttPacketLen++] = WORD_MSB(UsernameLen); //用户名长度高位

mqttPacket[mqttPacketLen++] = WORD_LSB(UsernameLen); //用户名长度低位

memcpy(&mqttPacket[mqttPacketLen],Username,UsernameLen);

mqttPacketLen += UsernameLen;

}

if(PasswordLen > 0)

{

mqttPacket[mqttPacketLen++] = WORD_MSB(PasswordLen); //密码长度高位

mqttPacket[mqttPacketLen++] = WORD_LSB(PasswordLen); //密码长度低位

memcpy(&mqttPacket[mqttPacketLen],Password,PasswordLen);

mqttPacketLen += PasswordLen;

}

return mqttPacketLen;

}

/*

*@brief 连接消息解包

*@param rev_data 接收的数据

*@return 数据包长度

*/

uint8_t MQTT_UnPacketConnectAck(uint8_t *rev_data)

{

if(rev_data[1] != 0x02)

return 1;

if(rev_data[2] == 0x00 || rev_data[2] == 0x01)

return rev_data[3];

else

return 255;

}

/*

*@brief 断开连接消息组包

*@param mqttPacket 数据包指针

*@return 数据包长度

*/

uint8_t MQTT_PacketDisConnect(uint8_t *mqttPacket)

{

uint16_t mqttPacketLen = 0;

//固定报头

mqttPacket[mqttPacketLen++] = M_DISCONNECT << 4;

//剩余长度值

mqttPacket[mqttPacketLen++] = 0;

return 0;

}

/*

*@brief MQTT订阅消息组包函数

*@param topic 主题名称

* qos 消息等级

* mqttPacket 数据包指针

*@return 数据包长度

*/

uint16_t MQTT_PacketSubscribe(char *topic,uint8_t qos,uint8_t *mqttPacket)

{

int16_t len;

int16_t topiclen;

uint16_t mqttPacketLen = 0;

topiclen = strlen(topic);

len = 2 + (topiclen + 2) + 1; //可变报头的长度(2字节)加上有效载荷的长度

//固定报头

//控制报文类型

mqttPacket[mqttPacketLen++] = 0x82; //消息类型和标志订阅

//剩余长度

do

{

unsigned char encodedByte = len % 128;

len = len / 128;

// 如果有更多的数据要编码,请设置该字节的最高位

if ( len > 0 )

encodedByte = encodedByte | 128;

mqttPacket[mqttPacketLen++] = encodedByte;

} while ( len > 0 );

//可变报头

mqttPacket[mqttPacketLen++] = 0; //消息标识符 MSB

mqttPacket[mqttPacketLen++] = 0x01; //消息标识符 LSB

//有效载荷

mqttPacket[mqttPacketLen++] = WORD_MSB(topiclen);//主题长度 MSB

mqttPacket[mqttPacketLen++] = WORD_LSB(topiclen);//主题长度 LSB

memcpy(&mqttPacket[mqttPacketLen],topic,topiclen);

mqttPacketLen += topiclen;

mqttPacket[mqttPacketLen++] = qos;//QoS级别

return mqttPacketLen;

}

/*

*@brief MQTT订阅回复消息解包函数

*@param rev_data 接收的数据

*@return 0-成功 其他-失败

*/

uint8_t MQTT_UnPacketSubscribe(uint8_t *rev_data)

{

uint8_t result = 255;

if(rev_data[2] == WORD_MSB(MQTT_SUBSCRIBE_ID) && rev_data[3] == WORD_LSB(MQTT_SUBSCRIBE_ID))

{

switch(rev_data[4])

{

case 0x00:

case 0x01:

case 0x02:

//MQTT订阅成功

result = 0;

break;

case 0x80:

//MQTT订阅失败

result = 1;

break;

default:

//MQTT未知错误

result = 2;

break;

}

}

return result;

}

/*

*@brief MQTT取消订阅消息组包函数

*@param topic 主题名称

* qos 消息等级

* mqttPacket 数据包指针

*@return 数据包长度

*/

uint16_t MQTT_PacketUnSubscribe(char *topic,uint8_t qos,uint8_t *mqttPacket)

{

int16_t topiclen;

int16_t len;

uint16_t mqttPacketLen = 0;

topiclen = strlen(topic);

len = 2 + (topiclen + 2);//可变报头的长度(2字节)加上有效载荷的长度

//固定报头

//控制报文类型

mqttPacket[mqttPacketLen++] = 0xA2; //取消订阅

//剩余长度

do

{

uint8_t encodedByte = len % 128;

len = len / 128;

// if there are more data to encode, set the top bit of this byte

if ( len > 0 )

encodedByte = encodedByte | 128;

mqttPacket[mqttPacketLen++] = encodedByte;

} while ( len > 0 );

//可变报头

mqttPacket[mqttPacketLen++] = 0; //消息标识符 MSB

mqttPacket[mqttPacketLen++] = 0x01; //消息标识符 LSB

//有效载荷

mqttPacket[mqttPacketLen++] = WORD_MSB(topiclen);//主题长度 MSB

mqttPacket[mqttPacketLen++] = WORD_LSB(topiclen);//主题长度 LSB

memcpy(&mqttPacket[mqttPacketLen],topic,topiclen);

mqttPacketLen += topiclen;

return mqttPacketLen;

}

/*

*@brief MQTT取消订阅回复消息解包函数

*@param rev_data 接收的数据

*@return 0-成功 其他-失败

*/

uint8_t MQTT_UnPacketUnSubscribe(uint8_t *rev_data)

{

uint8_t result = 1;

if(rev_data[2] == WORD_MSB(MQTT_UNSUBSCRIBE_ID) && rev_data[3] == WORD_LSB(MQTT_UNSUBSCRIBE_ID))

{

result = 0;

}

return result;

}

/*

*@brief MQTT发布消息组包函数

*@param topic 主题

* message 消息

* qos 消息等级

* mqttPacket 数据包指针

*@return 0-成功 其他-失败

*/

uint16_t MQTT_PacketPublish(char *topic, char *message, uint8_t qos,uint8_t *mqttPacket)

{

int16_t len;

int16_t topicLength = strlen(topic);

int16_t messageLength = strlen(message);

static uint16_t id=0;

uint16_t mqttPacketLen = 0;

//有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度

//QOS为0时没有标识符

//数据长度 主题名 报文标识符 有效载荷

if(qos) len = (2+topicLength) + 2 + messageLength;

else len = (2+topicLength) + messageLength;

//固定报头

//控制报文类型

mqttPacket[mqttPacketLen++] = 0x30; // MQTT 消息类型是 PUBLISH

//剩余长度

do

{

uint8_t encodedByte = len % 128;

len = len / 128;

// 如果有更多的数据要编码,请设置该字节的最高位

if ( len > 0 )

encodedByte = encodedByte | 128;

mqttPacket[mqttPacketLen++] = encodedByte;

} while ( len > 0 );

mqttPacket[mqttPacketLen++] = WORD_MSB(topicLength);//主题长度MSB

mqttPacket[mqttPacketLen++] = WORD_LSB(topicLength);//主题长度LSB

memcpy(&mqttPacket[mqttPacketLen],topic,topicLength);//拷贝主题

mqttPacketLen += topicLength;

//报文标识符

if(qos)

{

mqttPacket[mqttPacketLen++] = WORD_MSB(id);

mqttPacket[mqttPacketLen++] = WORD_LSB(id);

id++;

}

memcpy(&mqttPacket[mqttPacketLen],message,messageLength);

mqttPacketLen += messageLength;

return mqttPacketLen;

}

/*

*@brief MQTT发布回复消息解包函数

*@param rev_data 接收的数据

*@return 0-成功 其他-失败

*/

uint8_t MQTT_UnPacketPublishAck(uint8_t *rev_data)

{

if(rev_data[1] != 2)

return 1;

if(rev_data[2] == WORD_MSB(MQTT_PUBLISH_ID) && rev_data[3] == WORD_LSB(MQTT_PUBLISH_ID))

return 0;

else

return 1;

}

/*

*@brief MQTT收到发布消息解包函数

*@param rev_data 接收的数据

* topic 主题

* topic_len 主题长度

* payload 有效载荷

payload_len 有效载荷长度

*@return 0-成功 其他-失败

*/

uint8_t MQTT_UnPacketPublish(uint8_t *rev_data, int8_t *topic, uint16_t *topic_len, int8_t *payload, uint16_t *payload_len)

{

int i = 0;

uint8_t *msgPtr;

uint32_t remain_len = 0;

uint32_t multiplier = 1;

uint8_t *in;

in = rev_data+1;

for(i = 0; i < 4; ++i)

{

remain_len += (in[i] & 0x7f) * multiplier;

if(!(in[i] & 0x80))

{

// return i + 1;

i += 1;

break;

}

multiplier <<= 7;

if(multiplier >= 2097152) //128 * 128 * 128

{

return -2; // error, out of range

}

}

msgPtr = rev_data + i + 1;

if(remain_len < 2) //retain

return 255;

*topic_len = (uint16_t)msgPtr[0] << 8 | msgPtr[1];

if(remain_len < *topic_len + 2)

return 255;

memcpy(topic, (int8_t *)msgPtr + 2, *topic_len); //复制数据

*payload_len = remain_len - 2 - *topic_len;

memcpy(payload, (int8_t *)msgPtr + 2 + *topic_len, *payload_len);

return 0;

}

/*

*@brief MQTT心跳请求组包函数

*@param mqttPacket 数据包指针

*@return 0-成功 其他-失败

*/

uint8_t MQTT_PacketPing(uint8_t *mqttPacket)

{

uint8_t mqttPacketLen = 0;

/*************************************固定头部***********************************************/

//固定头部----------------------头部消息-------------------------------------------------

mqttPacket[mqttPacketLen++] = M_PINGREQ << 4;

//固定头部----------------------剩余长度-------------------------------------------------

mqttPacket[mqttPacketLen++] = 0;

return mqttPacketLen;

}

/*

*@brief MQTT接收消息解包函数

*@param dataPtr 消息包指针

*@return 0-成功 其他-失败

*/

uint8_t MQTT_UnPacketRecv(uint8_t *dataPtr)

{

uint8_t status = 255;

uint8_t type = dataPtr[0] >> 4; //类型检查

if(type < 1 || type > 14)

return status;

if(type == M_PUBLISH)

status = type;

else

status = type;

return status;

}

新建shfm_mqtt.h文件,添加以下代码

#ifndef __MQTTData_H

#define __MQTTData_H

//C库

#include

#include

#include

#include

#define WORD_MSB(A) (uint8_t)((A & 0xFF00) >> 8)

#define WORD_LSB(A) (uint8_t)(A & 0x00FF)

#define MQTT_PUBLISH_ID 10

#define MQTT_SUBSCRIBE_ID 20

#define MQTT_UNSUBSCRIBE_ID 30

#define MQTT_CONNECT_WILL_QOS0 0x00

#define MQTT_CONNECT_PASSORD 0x40

#define MQTT_CONNECT_USER_NAME 0x80

#define MQTT_KEEP_LIVE_TIME 120 //单位 S

#define _DEBUG_MQTT 1

typedef enum

{

//名字 值 报文流动方向 描述

M_RESERVED1 =0 , // 禁止 保留

M_CONNECT , // 客户端到服务端 客户端请求连接服务端

M_CONNACK , // 服务端到客户端 连接报文确认

M_PUBLISH , // 两个方向都允许 发布消息

M_PUBACK , // 两个方向都允许 QoS 1消息发布收到确认

M_PUBREC , // 两个方向都允许 发布收到(保证交付第一步)

M_PUBREL , // 两个方向都允许 发布释放(保证交付第二步)

M_PUBCOMP , // 两个方向都允许 QoS 2消息发布完成(保证交互第三步)

M_SUBSCRIBE , // 客户端到服务端 客户端订阅请求

M_SUBACK , // 服务端到客户端 订阅请求报文确认

M_UNSUBSCRIBE , // 客户端到服务端 客户端取消订阅请求

M_UNSUBACK , // 服务端到客户端 取消订阅报文确认

M_PINGREQ , // 客户端到服务端 心跳请求

M_PINGRESP , // 服务端到客户端 心跳响应

M_DISCONNECT , // 客户端到服务端 客户端断开连接

M_RESERVED2 , // 禁止 保留

} _typdef_mqtt_message;

uint16_t MQTT_PacketConnect(char *Username,char *Password,char *ClientID,uint8_t *mqttPacket);

uint8_t MQTT_UnPacketConnectAck(uint8_t *rev_data);

uint16_t MQTT_PacketPublish(char *topic, char *message, uint8_t qos,uint8_t *mqttPacket);

uint8_t MQTT_UnPacketPublishAck(uint8_t *rev_data);

uint8_t MQTT_UnPacketPublish(uint8_t *rev_data, int8_t *topic, uint16_t *topic_len, int8_t *payload, uint16_t *payload_len);

uint16_t MQTT_PacketSubscribe(char *topic,uint8_t qos,uint8_t *mqttPacket);

uint8_t MQTT_UnPacketSubscribe(uint8_t *rev_data);

uint8_t MQTT_UnPacketUnSubscribe(uint8_t *rev_data);

uint16_t MQTT_PacketUnSubscribe(char *topic,uint8_t qos,uint8_t *mqttPacket);

uint8_t MQTT_PacketDisConnect(uint8_t *mqttPacket);

uint8_t MQTT_PacketPing(uint8_t *mqttPacket);

uint8_t MQTT_UnPacketRecv(uint8_t *dataPtr);

#endif

新建mqttclient.h文件,添加以下代码

#ifndef __MQTT_CLIENT_H_

#define __MQTT_CLIENT_H_

#include "sys.h"

#ifndef MQTTCLIENT_OK

#define MQTTCLIENT_OK 0

#endif

#ifndef MQTTCLIENT_NOK

#define MQTTCLIENT_NOK 1

#endif

#define MQTTCLIENT_RETTYPE unsigned char

MQTTCLIENT_RETTYPE MQTTClient_DevLink(void);

MQTTCLIENT_RETTYPE MQTTClient_Subscribe(const char *topic);

MQTTCLIENT_RETTYPE MQTTClient_Publish(const char *topic, const char *msg);

MQTTCLIENT_RETTYPE MQTTClient_HeartBeat(void);

void MQTTClient_RevPro(uint8_t *cmd);

#endif

新建mqttclient.c文件,编写以下代码

#include "mqttclient.h"

#include "shfm_mqtt.h"

#include "ESP8266.h"

#include "stdio.h"

#include "GloalConfig.h"

#include "gpio.h"

/*

*@brief MQTT连接函数

*@param void

*@return MQTTCLIENT_OK-成功 其余-失败

*/

MQTTCLIENT_RETTYPE MQTTClient_DevLink(void)

{

uint8_t mqttPacket[100]= {0};

uint16_t mqttPacketLen = 0;

uint8_t *dataPtr = NULL;

UsartPrintf(USART_DEBUG,"Tips: Connect Mqtt\r\n");

UsartPrintf(USART_DEBUG,"Tips: UserName:%s\r\n",MQTT_USERNAME);

UsartPrintf(USART_DEBUG,"Tips: Password:%s\r\n",MQTT_PASSWORD);

UsartPrintf(USART_DEBUG,"Tips: ClientId:%s\r\n",MQTT_CLIENTID);

mqttPacketLen = MQTT_PacketConnect(MQTT_USERNAME,MQTT_PASSWORD,MQTT_CLIENTID,mqttPacket);

if(mqttPacketLen < 2)

return MQTTCLIENT_NOK;

ESP8266_SendData(DISABLE,mqttPacket, mqttPacketLen,Single_ID_0); //上传平台

dataPtr = ESP8266_GetIPD(DISABLE,200); //等待平台响应

if(dataPtr != NULL)

{

if(MQTT_UnPacketRecv(dataPtr) == M_CONNACK)

{

return MQTT_UnPacketConnectAck(dataPtr); //返回0为连接成功

}

}

return MQTTCLIENT_NOK;

}

/*

*@brief MQTT订阅函数

*@param topic 主题

*@return MQTTCLIENT_OK-成功 其余-失败

*/

MQTTCLIENT_RETTYPE MQTTClient_Subscribe(const char *topic)

{

uint8_t mqttPacket[100]= {0};

uint16_t mqttPacketLen = 0;

uint8_t *dataPtr = NULL;

UsartPrintf(USART_DEBUG,"Tips: Subscribe Topic:%s\r\n",topic);

mqttPacketLen = MQTT_PacketSubscribe((char*)topic,MQTT_CONNECT_WILL_QOS0,mqttPacket);

if(mqttPacketLen < 2)

return MQTTCLIENT_NOK;

ESP8266_SendData(DISABLE,mqttPacket, mqttPacketLen,Single_ID_0); //上传平台

dataPtr = ESP8266_GetIPD(DISABLE,200); //等待平台响应

if(dataPtr != NULL)

{

if(MQTT_UnPacketRecv(dataPtr) == M_SUBACK)

{

return MQTT_UnPacketSubscribe(dataPtr); //返回0为连接成功

}

}

return MQTTCLIENT_NOK;

}

/*

*@brief MQTT客户端发布函数

*@param topic 主题

* msg 发布消息

*@return MQTTCLIENT_OK-成功 其余-失败

*/

MQTTCLIENT_RETTYPE MQTTClient_Publish(const char *topic, const char *msg)

{

uint8_t mqttPacket[100]= {0};

uint16_t mqttPacketLen = 0;

uint8_t *dataPtr = NULL;

uint8_t ucExecRes = MQTTCLIENT_NOK;

UsartPrintf(USART_DEBUG,"Tips: Publish Topic:%s,Message:%s\r\n",topic,msg);

mqttPacketLen = MQTT_PacketPublish((char*)topic,(char*)msg,MQTT_CONNECT_WILL_QOS0,mqttPacket);

if(mqttPacketLen < 2)

return MQTTCLIENT_NOK;

if(ESP8266_SendData(DISABLE,mqttPacket, mqttPacketLen,Single_ID_0) == ESP8266_OK) //向平台发送订阅请求

ucExecRes = MQTTCLIENT_OK;

return ucExecRes;

}

/*

*@brief MQTT客户端心跳函数

*@param void

*@return MQTTCLIENT_OK-成功 其余-失败

*/

MQTTCLIENT_RETTYPE MQTTClient_HeartBeat(void)

{

uint8_t mqttPacket[2]= {0};

uint16_t mqttPacketLen = 0;

uint8_t *dataPtr = NULL;

UsartPrintf(USART_DEBUG,"Tips: HearBeat\r\n");

mqttPacketLen = MQTT_PacketPing(mqttPacket);

ESP8266_SendData(DISABLE,mqttPacket, sizeof(mqttPacket),Single_ID_0); //向平台上传心跳请求

dataPtr = ESP8266_GetIPD(DISABLE,200);

if(dataPtr != NULL)

{

if(dataPtr[0] == M_PINGRESP)

{

return MQTTCLIENT_OK;

}

}

return MQTTCLIENT_NOK;

}

/*

*@brief MQTT客户端协议接收函数

*@param cmd 接收到的命令

*@return MQTTCLIENT_OK-成功 其余-失败

*/

void MQTTClient_RevPro(uint8_t *cmd)

{

uint8_t type = 0;

uint8_t result = 0;

int8_t cmdid_topic[50] = {0};

int8_t req_payload[100] = {0};

uint16_t topic_len = 0;

uint16_t req_len = 0;

type = MQTT_UnPacketRecv(cmd); //进行解包

if(type == M_PUBLISH)

{

result = MQTT_UnPacketPublish(cmd, cmdid_topic,&topic_len,req_payload, &req_len);

if(result == 0)

{

UsartPrintf(USART_DEBUG, "topic: %s, topic_len: %d, payload: %s, payload_len: %d\r\n",

cmdid_topic, topic_len, req_payload, req_len);

if(strstr((char*)req_payload,"openLed"))

{

LED_Set(LED_ON); //打开LED灯

}

else if(strstr((char*)req_payload,"closeLed"))

{

LED_Set(LED_OFF); //关闭LED灯

}

}

}

}

打开main.c文件,在main函数输入以下代码

/* Private typedef -----------------------------------------------------------*/

/* USER CODE BEGIN PTD */

ESP8266_RETTYPE netStatus = ESP8266_NOK;

unsigned char netErrCount = 0;

/* USER CODE END PTD */

/* USER CODE BEGIN 1 */

unsigned char *dataPtr = NULL;

uint32_t send_time = 0;

ESP8266_RETTYPE ucExecRes = MQTTCLIENT_NOK;

uint8_t mqttWork = 0;

char sendData[100];

/* USER CODE END 1 */

/* MCU Configuration--------------------------------------------------------*/

/* Reset of all peripherals, Initializes the Flash interface and the Systick. */

HAL_Init();

/* USER CODE BEGIN Init */

/* USER CODE END Init */

/* Configure the system clock */

SystemClock_Config();

/* USER CODE BEGIN SysInit */

/* USER CODE END SysInit */

/* Initialize all configured peripherals */

MX_GPIO_Init();

MX_DMA_Init();

MX_LPUART1_UART_Init();

MX_USART1_UART_Init();

MX_TIM2_Init();

/* USER CODE BEGIN 2 */

USART_Interupt_Enable(); //使能串口中断

TIM_Interupt_Enable(); //使能定时器中断

/* USER CODE END 2 */

/* Infinite loop */

/* USER CODE BEGIN WHILE */

while (1)

{

/* USER CODE END WHILE */

/* USER CODE BEGIN 3 */

if(!netDeviceInfo.netWork) //如果网络未初始化

{

if(!NET_DEVICE_Init()) //进行网络初始化

{

mqttWork = MQTTClient_DevLink() == 0 ? 1 : 0; //连接MQTT服务器

if(mqttWork == 1) //如果连接MQTT成功

{

printf("连接服务器成功\r\n");

MQTTClient_Subscribe(MQTT_SUB_TOPIC); //订阅MQTT下发主题

}

}

}

if(mqttWork)

{

dataPtr = ESP8266_GetIPD(DISABLE,0); //等待平台响应

if(dataPtr != NULL)

{

if(MQTT_UnPacketRecv(dataPtr) == M_PUBLISH)

{

MQTTClient_RevPro(dataPtr);

}

}

}

if(time2Count - send_time >= 10000) //(1ms * 10000)相当于延时10秒钟

{

send_time = time2Count; //记下当前定时器的数值

if(mqttWork)

{

sprintf(sendData,"{\"msg\":{\"paramdata\":[{\"temp\":20,\"humi\":98}]}}"); //

ucExecRes = MQTTClient_Publish(MQTT_PUB_TOPIC,sendData);

if(ucExecRes == MQTTCLIENT_NOK)

{

netErrCount++; //错误次数进行累加

if(netErrCount >= 3) //超过三次,进行自检

{

netStatus = ESP8266_Get_LinkStatus(); //检查连接状态

if(netStatus == 4) //网络已经断开

{

netErrCount = 0; //将错误清零

netDeviceInfo.netWork = 0; //标志网络断开

netDeviceInfo.initStep = 0; //将初始化步骤清零

}

}

}

else

{

netErrCount = 0; //无错误,清除错误计数

}

}

}

}

/* USER CODE END 3 */

4、实验现象

实现的功能1、上电自动连接WIFI2、连接WIFI成功后,连接MQTT服务器3、连接MQTT服务器后,接收来自服务器的指令,并执行指令中的开关灯操作4、每10秒向服务器上传一次数据5、上传数据失败超过3次,自动进行网络初始化

客户端连接服务器并上传数据

MQTT.fx发送开关灯指令

客户端接收到数据并执行

好文链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。