MQTT封装FreeRTOS下WIFI模块下的socket

📅 2026/7/6 6:20:56 👁️ 阅读次数 📝 编程学习
MQTT封装FreeRTOS下WIFI模块下的socket

一.MQTT概述

MQTT 协议全称是 Message Queuing Telemetry Transport,即“消息队列遥测传输协 议”,它是物联网常用的应用层协议,运行在 TCP/IP 中的应用层中,依赖 TCP 协议,因此 它具有非常高的可靠性。同时它是基于 TCP 协议的“客户端-服务器”模型发布/订阅主题 消息的轻量级协议。 MQTT 协议提供一对多的消息发布,可以降低应用程序的耦合性,用户只需要编写极少 量的应用代码就能完成一对多的消息发布与订阅,该协议是基于“客户端-服务器”模型, 在协议中主要有三种身份:发布者(Publisher)、服务器(Broker)以及订阅者 (Subscriber)。其中,MQTT 消息的发布者和订阅者都是客户端,服务器只是作为一个中 转的存在,将发布者发布的消息进行转发给所有订阅该主题的订阅者;发布者可以发布在 其权限之内的所有主题,并且消息发布者可以同时是订阅者,实现了生产者与消费者的脱 耦,发布的消息可以同时被多个订阅者订阅。MQTT 客户端的功能:

① 发布消息给其它相关的客户端。

② 订阅主题请求接收相关的应用消息。

③ 取消订阅主题请求移除接收应用消息。

④ 从服务端终止连接。

MQTT 服务器常被称为 Broker(消息代理),以是一个应用程序或一台设备,它一般 为云服务器,比如 BTA 三巨头的一些物联网平台就是常使用 MQTT 协议,它是位于消息发布 者和订阅者之间,以便用于接收消息并发送到订阅者之中,它的功能有:

① 接受来自客户端的网络连接请求。

② 接受客户端发布的应用消息。

③ 处理客户端的订阅和取消订阅请求。

④ 转发应用消息给符合条件的已订阅客户端(包括发布者自身)。

二.MQTT的移植

1.这边我移植的是paho.mqtt,可以在网上找到并下载。

涉及到的主要文件包括2个,将这2文件移植到自己的工程中:

2.移植进去后编译,将有各种错误,可以先将各种小错误先解决,如果实在不会解决的话可以注释掉,说要目标是先编译通过。

三.下载相关软件

后续还将涉及到其他2个软件建议下载并安装完成

四.适配工程

1.移植过后,FreeRTOS的代码适配自己的at代码

2.增加几个接口函数,来对接

将输进来的域名和ip转化为网络数据:

uint32_t at_gethostbyname(char *addr) { uint32_t ipaddr; int i; int isIpStr = 1; /* *addr: "iot.100ask.net" ==> 32BIT IP *"192.168.1.49" ==> 32BIT IP, buf[0]=49, buf[1]=1, buf[2]=168, buf[3]=192 */ /* 如果是10进制的点分字符串 */ for(i = 0; addr[i]; i++) { if (addr[i] == '.') continue; if (addr[i] >= '0' && addr[i] <= '9') continue; isIpStr = 0; break; } if(isIpStr) { inet_pton(0, addr, &ipaddr); return ipaddr; } return esp8266_gethostbyname(addr); } uint32_t esp8266_gethostbyname(char *addr) { int8_t buf[100]; int err; uint32_t resp_len; int a,b,c,d; uint32_t ipaddr; PAT_Device ptDev = get_esp8266_device(); /* 发出查询ipd的AT命令:"AT+CIFSR\r\n" */ err = at_exec_cmd(ptDev, (int8_t *)"AT+CIFSR\r\n", (uint8_t *)buf, sizeof(buf), &resp_len, AT_TIMEOUT); if(err) { return 0; } /* 解析得到IP */ sscanf((const char *)buf, "+CIFSR:STAIP,\"%d.%d.%d.%d\"", &a, &b, &c, &d); ipaddr = ((uint32_t)a<<24 | (uint32_t)b<<16 | (uint32_t)c<<8 | (uint32_t)d); return ipaddr; }

3.增加socket延迟时间的函数

int esp8266_setsockopt(int socket, int level, int optname, const void *optval, socklen_t optlen) { PAT_Device ptDev = get_esp8266_device(); if(NULL == ptDev) return -1; uint32_t timeout = *((uint32_t *)optval); if(optname == SO_RCVTIMEO) { ptDev->sockets[socket].recv_timeout = timeout; } if(optname == SO_SNDTIMEO) { ptDev->sockets[socket].send_timeout = timeout; } return 0; }

五.应用函数

#define WIFI_NAME "Programmers" #define WIFI_PASSWD "100asktech" #define PC_MQTT_BROKER_IP "192.168.0.198" #define PC_MQTT_BROKER_PORT 1883 static void messageArrived(MessageData* md) { static int cnt = 0; MQTTMessage* m = md->message; char buf[100]; snprintf(buf, 100, "get msg %d: %s", cnt++, (char *)m->payload); memset(m->payload, 0, m->payloadlen); buf[99] = '\0'; printf("%s\r\n", buf); } static void test1(void) { int subsqos = 2; Network n; MQTTClient c; int rc = 0; char* sub_topic = "test1"; char* pub_topic = "test2"; MQTTPacket_willOptions wopts; unsigned char buf[100]; unsigned char readbuf[100]; char pubbuf[100]; int cnt = 0; int wait_seconds; MQTTMessage pubmsg; c.isconnected = 0; while(1) { if(c.isconnected == 0) { NetworkInit(&n); while(0 != NetworkConnect(&n, PC_MQTT_BROKER_IP, PC_MQTT_BROKER_PORT)) { printf("Re-Connect TCP/Port...\r\n"); vTaskDelay(100); } printf("Connect TCP/Port OK\r\n"); MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.willFlag = 1; data.clientID.cstring = "100ask_mqtt_test"; data.username.cstring = "testuser"; data.password.cstring = "testpassword"; data.keepAliveInterval = 20; data.cleansession = 1; data.will.message.cstring = "will message"; data.will.qos = 1; data.will.retained = 0; data.will.topicName.cstring = "will topic"; printf("Connect MQTT Broker...\r\n"); while(SUCCESS != MQTTConnect(&c, &data)) { printf("Re-Connect MQTT Broker...\r\n"); vTaskDelay(100); } printf("Connect MQTT Broker OK\r\n"); printf("MQTTSubscribe...\r\n"); rc = MQTTSubscribe(&c, sub_topic, subsqos, messageArrived); if(0 == rc) printf("MQTTSubscribe OK\r\n"); } else { /* ---------------------------------------------------------------------------------- */ memset(&pubmsg, '\0', sizeof(pubmsg)); sprintf(pubbuf, "msg from f407, %d", cnt++); pubmsg.payload = pubbuf; pubmsg.payloadlen = strlen(pubbuf); pubmsg.qos = 0; pubmsg.retained = 0; pubmsg.dup = 0; printf("%s\r\n", pubbuf); rc = MQTTPublish(&c, pub_topic, &pubmsg); wait_seconds = 10; while(wait_seconds-- > 0) { if(c.isconnected == 0) { printf("MQTT Disconnect!\r\n"); n.disconnect(&n); break; } else { MQTTYield(&c, 100); } } } } } void MQTTClientTask(void *parameter) { int err; at_init("uart2"); while(1) { err = at_connect_ap(WIFI_NAME, WIFI_PASSWD); if(!err) { break; } else { vTaskDelay(1000); } } printf("Connect TCP/Port...\r\n"); while(1) { test1(); } // vTaskDelete(NULL); }

六.修复bug

原先esp8266的接收函数增加一个退出机制,由原来的死等信号量机制变为循环等待。

int esp8266_recvfrom(int socket, void *mem, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen) { PAT_Device ptDev = get_esp8266_device(); if(NULL == ptDev) return -1; if(socket < 0 || socket > AT_MAX_SOCKETS_NUM) return -1; if(NULL == mem) return -1; PAT_Socket ptSocket = &ptDev->sockets[socket]; uint8_t data; size_t recv_len = 0; uint8_t *pdata = (uint8_t *)mem; //对于UDP,先发起AT命令来连接 if((ptSocket->type == SOCK_DGRAM) && (from != NULL)) { if(ptSocket->at_socket_open_flag==0) { if(esp8266_connect(socket, from, *fromlen)) { return -1; } } } //尝试从接收队列读取遗留的数据 while(xQueueReceive(ptSocket->recv_queue, &data, 0) == pdTRUE) { if(recv_len < len) { pdata[recv_len] = data; } recv_len++; if(recv_len >= len) { return recv_len; } } if(recv_len > 0) { return recv_len; } //无数据则等待信号量 // if(xSemaphoreTake(ptSocket->at_packet_sem, portMAX_DELAY) != pdTRUE) // { // return -1; // } //无数据则等待信号量 uint32_t timeout = ptDev->sockets[socket].recv_timeout; while(xSemaphoreTake(ptSocket->at_packet_sem, 10) != pdTRUE) { if(timeout > 10) timeout -= 10; else break; }

MQTT里面的cycle函数增加一些容错机制

int cycle(MQTTClient* c, Timer* timer) { static int err_cnt = 0; int len = 0, rc = MQTT_SUCCESS; int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ switch (packet_type) { default: /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ rc = packet_type; printf("error 1\r\n"); goto exit; case 0: /* timed out reading packet */ break; case CONNACK: case PUBACK: case SUBACK: case UNSUBACK: break; case PUBLISH: { MQTTString topicName; MQTTMessage msg; int intQoS; msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) { printf("error 2\r\n"); goto exit; } msg.qos = (enum QoS)intQoS; deliverMessage(c, &topicName, &msg); if (msg.qos != QOS0) { if (msg.qos == QOS1) len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); else if (msg.qos == QOS2) len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); if (len <= 0) rc = MQTT_FAILURE; else rc = sendPacket(c, len, timer); if (rc == MQTT_FAILURE) { printf("error 3\r\n"); goto exit; } } break; } case PUBREC: case PUBREL: { unsigned short mypacketid; unsigned char dup, type; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) rc = MQTT_FAILURE; else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) rc = MQTT_FAILURE; else if ((rc = sendPacket(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet rc = MQTT_FAILURE; // there was a problem if (rc == MQTT_FAILURE) { printf("error 4\r\n"); goto exit; // there was a problem } break; } case PUBCOMP: break; case PINGRESP: c->ping_outstanding = 0; break; } if (keepalive(c) != MQTT_SUCCESS) { //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT rc = MQTT_FAILURE; } exit: if(rc == MQTT_SUCCESS) { err_cnt = 0; rc = packet_type; } else if (c->isconnected) { if(err_cnt ++ == 10) { MQTTCloseSession(c); } } return rc; }

七.最终效果