目录

概述

1 认识Eclipse Paho C

1.1 paho.mqtt.c简介

1.2 下载和安装paho.mqtt.c

2 异步发布和订阅消息

2.1 编写异步发布消息功能

2.1.1 初始化MQTT参数

2.1.2 初始化函数

2.1.2 发布消息函数

2.1.3 完整代码

2.2 编译代码和测试

3 验证MQTT Client功能

3.1 EMQX服务器上查看MQTT Client

3.2 MQTT.fx订阅MQTTAsync

概述

本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现异步发布Message的功能,并使用基于EMQX的服务验证其功能,还使用MQTT.fx订阅消息,以验证发布消息功能的可靠性。

1 认识Eclipse Paho C

1.1 paho.mqtt.c简介

paho.mqtt.c是一个使用C语言编写的MQTT客户端库,用于连接和与MQTT代理进行通信。它是Eclipse Paho项目的一部分,旨在为C开发人员提供一个易于使用、可靠和高性能的MQTT解决方案。

这个库允许开发人员创建MQTT客户端,该客户端可以连接到远程MQTT代理,并实现与代理之间的消息发布、订阅和管理等功能。paho.mqtt.c库提供了一套简单而直观的API,使开发人员能够轻松地使用MQTT协议进行通信。

paho.mqtt.c库支持多种MQTT协议版本,包括MQTTv3.1和MQTTv3.1.1。它提供了多种操作和配置选项,以适应不同的需求和场景。该库还提供了一些高级功能,例如支持SSL/TLS加密、持久化会话、消息保持和遗嘱消息等。

paho.mqtt.c库在多个平台上都可以使用,包括Windows、Linux、macOS和嵌入式系统等。它具有良好的可移植性和可扩展性,可以方便地与其他C语言项目集成。

总之,paho.mqtt.c是一个强大而灵活的MQTT客户端库,适用于使用C语言开发MQTT应用程序的开发人员。它简化了与MQTT代理的通信过程,并提供了丰富的功能和选项,使开发人员能够快速构建高性能的MQTT应用程序。

1.2 下载和安装paho.mqtt.c

登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面

https://mqtt.org/

点击链接后,会跳转到下载页面,在该页面下载paho.mqtt.c

笔者选择使用命令直接安装该软件包,具体操作步骤如下:

Step -1: 下载软件包执行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

step-2: 进入paho.mqtt.c目录,执行make

cd paho.mqtt.c

make

系统会自动编译代码,等待编译结果。

编译完成后,会自动生成build文件,这时可以安装

step-3 : 执行如下命令就可以安装软件

sudo make install

2 异步发布和订阅消息

2.1 编写异步发布消息功能

2.1.1 初始化MQTT参数

参数功能介绍:

参数名称参数值描述ADDRESStcp://192.168.1.11:1883mqtt broker的IP地址CLIENTIDmqtt_ubuntu_asys设备IDTOPICMQTTAsync发布的TopicPAYLOAD12.56Topic下的payloadQOS1服务质量等级=1TIMEOUT10000L等待超时时间(us)USERNAMEmqtt_ubuntu_user终端认证usernamePASSWORD123456终端认证username对应的password

在代码中,其需要填写的位置如下:

2.1.2 初始化函数

初始化MQTT终端需要完成以下3个步骤:

step-1: 创建MQTT Client

step-2: 配置回调函数

step-3: 连接服务器

具体实现代码如下:

代码85行:创建MQTT Client,需要传入服务器IP和Client ID信息

代码93行:配置callback函数,函数原型,见源码

代码100行: 心跳包时间间隔设置为60s

代码101行: 清除会话 标记设置为1,不接受离线消息

代码102行: 配置设备终端用户

代码103行: 配置设备终端用户password

2.1.2 发布消息函数

要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。

代码59行: 装载payload

代码60行:payload的字符长度

代码61行:消息服务等级参数

代码62行:配置为不保留消息

代码64行:使用MQTTClient_publishMessage函数发布消息

代码74行:比较返回来token的值,如果数据发送成功,token值会自动加1

2.1.3 完整代码

创建test_01_Asynchronous.c,编写如下代码:

/***************************************************************

Copyright 2024-2029. All rights reserved.

文件名 : test_01_Asynchronous.c

作者 : tangmingfei2013@126.com

版本 : V1.0

描述 : mqtt异步发布和订阅消息

日志 : 初版V1.0 2024/03/13

***************************************************************/

#include

#include

#include

#include "MQTTClient.h"

#include

#define ADDRESS "tcp://192.168.1.11:1883"

#define CLIENTID "mqtt_ubuntu_asys"

#define TOPIC "MQTTAsync"

#define PAYLOAD "12.56"

#define QOS 1

#define TIMEOUT 10000L

#define USERNAME "mqtt_ubuntu_user"

#define PASSWORD "123456"

static MQTTClient client;

static MQTTClient_deliveryToken deliveredtoken;

static MQTTClient_message pubmsg = MQTTClient_message_initializer;

static MQTTClient_deliveryToken token;

static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

void delivered(void *context, MQTTClient_deliveryToken dt)

{

printf("Message with token value %d delivery confirmed\n", dt);

deliveredtoken = dt;

}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)

{

printf("Message arrived\n");

printf(" topic: %s\n", topicName);

printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);

MQTTClient_freeMessage(&message);

MQTTClient_free(topicName);

return 1;

}

void connlost(void *context, char *cause)

{

printf("\nConnection lost\n");

printf(" cause: %s\n", cause);

}

void public_mqttMsg()

{

int rc;

pubmsg.payload = PAYLOAD;

pubmsg.payloadlen = (int)strlen(PAYLOAD);

pubmsg.qos = QOS;

pubmsg.retained = 1;

deliveredtoken = 0;

if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)

{

printf("Failed to publish message, return code %d\n", rc);

rc = EXIT_FAILURE;

}

else

{

printf("Waiting for publication of %s\n"

"on topic %s for client with ClientID: %s\n",

PAYLOAD, TOPIC, CLIENTID);

while (deliveredtoken != token)

{

usleep(10000000L);

}

}

}

int main(int argc, char* argv[])

{

int rc;

if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,

MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)

{

printf("Failed to create client, return code %d\n", rc);

rc = EXIT_FAILURE;

goto exit;

}

if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)

{

printf("Failed to set callbacks, return code %d\n", rc);

rc = EXIT_FAILURE;

goto destroy_exit;

}

conn_opts.keepAliveInterval = 60;

conn_opts.cleansession = 0;

conn_opts.username = USERNAME; //用户名

conn_opts.password = PASSWORD; //密码

if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)

{

printf("Failed to connect, return code %d\n", rc);

rc = EXIT_FAILURE;

goto destroy_exit;

}

while(1)

{

public_mqttMsg();

}

if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)

{

printf("Failed to disconnect, return code %d\n", rc);

rc = EXIT_FAILURE;

}

destroy_exit:

MQTTClient_destroy(&client);

exit:

return rc;

}

2.2 编译代码和测试

使用如下命令编译代码

gcc test_01_Asynchronous.c -lpaho-mqtt3a

执行.out文件后,可以看见,MQTT Client发布消息成功了,服务器端收到消息后,token值会自动加1

3 验证MQTT Client功能

3.1 EMQX服务器上查看MQTT Client

在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见

为了测试方便,置retained = 1,这样EMQX服务器会保留Client发布的payload,便于在服务器上查看client发布的数据。

在EMQX服务器上,打开保留消息面板,查看MQTTAsync主题的payload,其和发布的值一致。

3.2 MQTT.fx订阅MQTTAsync

要使用MQTT.fx MQTT Client工具订阅MQTTAsync,首先保证MQTT.fx能正常连接至EMQX服务器

连接成功后,可以在MQTT.fx 上订阅MQTTAsync,订阅成功后,MQTT.fx subscribe面板会打印MQTTAsync的log

同时,在EMQX服务器上的订阅管理面板上可以看见:MQTT.fx 订阅Topic 为MQTTAsync的消息

相关链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。