Toggle navigation
首页
(current)
问答
文章
话题
商城
登录
注册
Luat系列教程:6、mqtt代码详解
LUAT
教程
mqtt
设备联网后向/d/test/设备的imei值的topic发送payload为当前设备ICCID的字符串 服务器收到后向/s/test/设备的imei值的topic发送payload为ok两个字节的字符串 设备收到topic为/s/test/设备的imei值,payload为ok的数据后,再次向/d/test/设备的imei值的topic发送payload为done四个字节的字符串
>写在前面: 由于本人并未学习过具体原理,所以本文可能会有多处常识性错误,如有发现请留言指出,谢谢! --- >阅读本文需要具有的技能: 看过该系列前几篇文章或明白前几篇文章内容的 熟悉lua语法,尤其是数组部分 可以明白字符串、字节码之间的区别 可以自己实践操作 对mqtt通讯有基本的了解 用过这东西 ## 官方demo代码 官方代码可以在github(https://github.com/openLuat/Luat_2G_RDA_8955/)的`Luat_2G_RDA_8955/script_LuaTask/demo/mqtt`目录或luatools的`LuaTools 1.x.x\script\script_LuaTask\demo\mqtt`目录找到 如果你能看懂官方例程,**那么可以直接去使用,不需要再看本文了** # 先定义一个假装能用来测试的mqtt需求 客户端订阅主题: /d/test/# 设备订阅主题: /s/test/设备的imei值 - 设备联网后向`/d/test/设备的imei值`的topic发送payload为当前设备ICCID的字符串 - 服务器收到后向`/s/test/设备的imei值`的topic发送payload为`ok`两个字节的字符串 - 设备收到topic为`/s/test/设备的imei值`,payload为`ok`的数据后,再次向`/d/test/设备的imei值`的topic发送payload为`done`四个字节的字符串 需求十分简单: ![](http://oldask.openluat.com/image/show/attachments-2018-08-Iyg9g3vU5b83d12d58d3d.png) 如果你知道mqtt是什么,肯定能明白 # 建立文件 首先先新建两个文件,用于测试这个工程 main.lua ```lua PROJECT = "MQTT-TEST" VERSION = "1.0.0" --根据固件判断模块类型 moduleType = string.find(rtos.get_version(),"8955") and 2 or 4 require "log" LOG_LEVEL = log.LOGLEVEL_TRACE require "sys" --每1分钟查询一次GSM信号强度,每1分钟查询一次基站信息 require "net" net.startQueryAll(60000, 60000) --加载硬件看门狗功能模块 --如果用的是720 4g模块,请注释掉这两行 require "wdt" wdt.setup(pio.P0_30, pio.P0_31) --加载网络指示灯功能模块 require "netLed" netLed.setup(true,moduleType == 2 and pio.P1_1 or pio.P2_0,moduleType == 2 and nil or pio.P2_1) require"mqttTest" --启动系统框架 sys.init(0, 0) sys.run() ``` mqttTest.lua ```lua module(...,package.seeall) require"misc" require"mqtt" --下面代码一会儿写 ``` # 建立mqtt线程 一般来说,mqtt连接都是异步运行的,何时应该发送数据,何时应该接收数据,这些逻辑应该让mqtt收发的进程自己进行控制 所以我们在`mqttTest.lua`中添加一个新的线程(看不懂的回去看前几篇文章),文件改成如下(`注意改一下测试服务器信息`): mqttTest.lua ```lua module(...,package.seeall) require"misc" require"mqtt" --测试用的服务器信息,按需求自己改 local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600 --启动mqtt客户端任务 sys.taskInit( function() while true do --该区域的代码会永久循环运行(除非出现语法错误) end end) ``` # 进行mqtt连接 一般来说,我们会在模块成功获取基站分配的ip后,才会进行网络的连接操作,所以我们需要使用`socket.isReady()`函数来判断是否连接网络,然后再进行网络操作 在成功获取ip后,我们才能新建一个mqtt对象,对其进行联网操作,mqtt客户端线程代码改为如下: ```lua --启动mqtt客户端任务 sys.taskInit( function() while true do --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") else log.info("mqttTest.mqttClient","connect fail") --连接失败 end else --没连上网,原地等待一秒,一秒后会循环回去重试 sys.wait(1000) end end end) ``` # 对连接失败的处理 上述代码只是一个简单的连接服务器的代码,并且连上之后没有进行任何的其他操作 为了增加代码的稳健性,我们可以利用`sys.waitUntil()`函数,设置五分钟内没有获取到ip就开启飞行模式几秒,再关闭,让模块重新去获取GPRS连接: ```lua --启动mqtt客户端任务 sys.taskInit( function() while true do --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") else log.info("mqttTest.mqttClient","connect fail") --连接失败 end else --没连上网 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) --等完了还没连上? if not socket.isReady() then --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end end) ``` 同样,我们也可以给`mqttClient:connect(mqttip,mqttport,"tcp")`的连接加上错误次数的判断,连接错误超过五次,强制断开socket连接,等待五秒后重试: ```lua --启动mqtt客户端任务 sys.taskInit( function() while true do local retryConnectCnt = 0 --失败次数统计 --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") retryConnectCnt = 0 --失败次数清零 else log.info("mqttTest.mqttClient","connect fail")--连接失败 retryConnectCnt = retryConnectCnt+1 --失败次数加一 end --断开MQTT连接 mqttClient:disconnect() if retryConnectCnt>=5 then link.shut() retryConnectCnt=0 end sys.wait(5000) else --没连上网 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) --等完了还没连上? if not socket.isReady() then --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end end) ``` # 添加发送/接收处理函数,订阅主题 到了这一步,整个的mqtt线程只剩下`循环处理接收和发送的数据`这一部分和订阅topic部分与demo不同了,我们直接把这两部分加到mqtt线程的代码中吧: ```lua --启动mqtt客户端任务 sys.taskInit( function() while true do local retryConnectCnt = 0 --失败次数统计 --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") retryConnectCnt = 0 --失败次数清零 --订阅主题 if mqttClient:subscribe({["/s/test/"..imei]=0}) then --循环处理接收和发送的数据 while true do if not mqttInMsgProc(mqttClient) then log.error("mqttTest.mqttInMsgProc error") break end if not mqttOutMsgProc(mqttClient) then log.error("mqttTest.mqttOutMsgProc error") break end end end else log.info("mqttTest.mqttClient","connect fail")--连接失败 retryConnectCnt = retryConnectCnt+1 --失败次数加一 end --断开MQTT连接 mqttClient:disconnect() if retryConnectCnt>=5 then link.shut() retryConnectCnt=0 end sys.wait(5000) else --没连上网 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) --等完了还没连上? if not socket.isReady() then --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end end) ``` 可以看到,在接收和发送函数不返回false的情况下,接收和发送循环会一直进行下去;只有当两个函数之一返回false时,才会触发break导致退出该接收和发送循环 # mqttInMsgProc(mqttClient)函数 这段的代码相对来说比较简单,我们可以直接使用`mqttClient:receive(毫秒数)`来接收我们的tcp消息。 我们在合适的地方,新建一个`mqttInMsgProc(mqttClient)`函数: ```lua function mqttInMsgProc(mqttClient) local result,data while true do result,data = mqttClient:receive(2000) --接收到数据 if result then log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload)) --TODO:根据需求自行处理data.payload else break end end return result or data=="timeout" end ``` 这段代码就是循环获取mqtt消息,如果没获取到,`mqttClient:receive(2000)`就会返回`false,"timeout"`;如果获取到了,就会返回`true,获取到的数据字符串`;如果返回了`false,不为"timeout"`,则表示数据处理出错,说明mqtt连接有了什么问题 细心的读者可能看出来了,如果接收函数一直在2秒内有接收到数据,那么这段函数会永远无限循环下去,没办法到达`mqttOutMsgProc(mqttClient)`函数进行发送数据的操作,所以我们先去讲`mqttOutMsgProc(mqttClient)`函数的实现过程,再回来改进`mqttInMsgProc(mqttClient)`函数 # mqttOutMsgProc(mqttClient)函数 由于发送函数在mqtt线程中是一个循环的小部分,所以我们要建立一个消息发送的队列:有要发送的发数据时,将数据放到这个队列中;等运行到`mqttOutMsgProc(mqttClient)`函数时,将队列中的数据一个一个发出去 首先我们要建一个放这种队列的数组,在合适位置声明一下这个数组: ```lua --数据发送的消息队列 local msgQuene = {} ``` 接着我们构造一个可以往数组里插入数据的函数,`table.insert()`可以向数组添加数据,所以我们新建一个`insertMsg`函数: ```lua local function insertMsg(topic,payload,qos,user) table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user}) end ``` 还记得上面说过的`消息接收函数函数会永远无限循环下去`的问题吗?我们在合适的地方新建一个判断发送消息队列是否为空的函数: ```lua function waitForSend() return #msgQuene > 0 end ``` 在数组有数据时,这个函数会返回true,我们可以将`mqttInMsgProc(mqttClient)`接收到数据后的代码添加一行判断发送队列是否有数据的代码,当检测到发送队列有数据时,就立即退出接收函数,转而去进行发送动作,接收函数最终改为了这样: ```lua function mqttInMsgProc(mqttClient) local result,data while true do result,data = mqttClient:receive(2000) --接收到数据 if result then log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload)) --TODO:根据需求自行处理data.payload --如果msgQuene中有等待发送的数据,则立即退出本循环 if waitForSend() then return true end else break end end return result or data=="timeout" end ``` 最后我们终于可以开始写消息发送函数了,整体的函数就是检查队列是否为空,不为空的话就发一条消息并将其从队列中删除,然后重复这一操作,函数代码如下: ```lua function mqttOutMsgProc(mqttClient) while #msgQuene>0 do --数组大于零? local outMsg = table.remove(msgQuene,1)--取出并删除一个元素 local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息 if outMsg.user and outMsg.user.cb then --如果存在回调函数 outMsg.user.cb(result,outMsg.user.para)--执行回调函数 end if not result then return end end return true end ``` >`outMsg.user`即为消息队列数组中的,消息数组中的,包含了回调函数的,数组(反正挺绕的) >具体就像下面这样用: ```lua insertMsg("/d/test/123","done",{cb=testcb}) local function testcb() log.info("test.testcb","test message sent") end ``` >这样,该条消息发送后就会执行指定的回调函数 # 完成基本的mqtt线程 经过上述的更改,最终,`mqttTest.lua`已经实现了连接服务器并自动处理错误的功能,并且预留了消息接收以及向发送队列添加数据的接口,文件的所有代码如下: mqttTest.lua ```lua require"misc" require"mqtt" --测试用的服务器信息,按需求自己改 local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600 --数据发送的消息队列 local msgQuene = {} local function insertMsg(topic,payload,qos,user) table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user}) end function waitForSend() return #msgQuene > 0 end function mqttOutMsgProc(mqttClient) while #msgQuene>0 do --数组大于零? local outMsg = table.remove(msgQuene,1)--取出并删除一个元素 local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息 if outMsg.user and outMsg.user.cb then --如果存在回调函数 outMsg.user.cb(result,outMsg.user.para)--执行回调函数 end if not result then return end end return true end function mqttInMsgProc(mqttClient) local result,data while true do result,data = mqttClient:receive(2000) --接收到数据 if result then log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload)) --TODO:根据需求自行处理data.payload --如果msgQuene中有等待发送的数据,则立即退出本循环 if waitForSend() then return true end else break end end return result or data=="timeout" end --启动mqtt客户端任务 sys.taskInit( function() while true do local retryConnectCnt = 0 --失败次数统计 --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") retryConnectCnt = 0 --失败次数清零 --订阅主题 if mqttClient:subscribe({["/s/test/"..imei]=0}) then --循环处理接收和发送的数据 while true do if not mqttInMsgProc(mqttClient) then log.error("mqttTest.mqttInMsgProc error") break end if not mqttOutMsgProc(mqttClient) then log.error("mqttTest.mqttOutMsgProc error") break end end end else log.info("mqttTest.mqttClient","connect fail")--连接失败 retryConnectCnt = retryConnectCnt+1 --失败次数加一 end --断开MQTT连接 mqttClient:disconnect() if retryConnectCnt>=5 then link.shut() retryConnectCnt=0 end sys.wait(5000) else --没连上网 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) --等完了还没连上? if not socket.isReady() then --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end end) ``` # 实现协议需求 ## 设备联网后向`/d/test/设备的imei值`的topic发送payload为当前设备ICCID的字符串 我们只需要在连接成功处添加代码即可,在`if mqttClient:subscribe({["/s/test/"..imei]=0}) then`所在代码下一行添加: ```lua insertMsg("/d/test/"..misc.getImei(),sim.getIccid()) ``` ## 设备收到topic为`/s/test/设备的imei值`,payload为`ok`的数据后,再次向`/d/test/设备的imei值`的topic发送payload为`done`四个字节的字符串 我们只需要在接收处理消息处添加代码即可,在`--TODO:根据需求自行处理data.payload`所在代码下一行添加: ```lua if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then insertMsg("/d/test/"..misc.getImei(),"done") end ``` # 完整代码 经过上面的删删改改,功能以及基本实现了,整个文件的代码如下: mqttTest.lua ```lua require"misc" require"mqtt" --测试用的服务器信息,按需求自己改 local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600 --数据发送的消息队列 local msgQuene = {} local function insertMsg(topic,payload,qos,user) table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user}) end function waitForSend() return #msgQuene > 0 end function mqttOutMsgProc(mqttClient) while #msgQuene>0 do --数组大于零? local outMsg = table.remove(msgQuene,1)--取出并删除一个元素 local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息 if outMsg.user and outMsg.user.cb then --如果存在回调函数 outMsg.user.cb(result,outMsg.user.para)--执行回调函数 end if not result then return end end return true end function mqttInMsgProc(mqttClient) local result,data while true do result,data = mqttClient:receive(2000) --接收到数据 if result then log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload)) --TODO:根据需求自行处理data.payload if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then insertMsg("/d/test/"..misc.getImei(),"done") end --如果msgQuene中有等待发送的数据,则立即退出本循环 if waitForSend() then return true end else break end end return result or data=="timeout" end --启动mqtt客户端任务 sys.taskInit( function() while true do local retryConnectCnt = 0 --失败次数统计 --是否获取到了分配的ip(是否连上网) if socket.isReady() then local imei = misc.getImei() --新建一个mqtt对象 local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword) --尝试连接指定服务器 if mqttClient:connect(mqttip,mqttport,"tcp") then --连接成功 log.info("mqttTest.mqttClient","connect success") retryConnectCnt = 0 --失败次数清零 --订阅主题 if mqttClient:subscribe({["/s/test/"..imei]=0}) then insertMsg("/d/test/"..misc.getImei(),sim.getIccid()) --循环处理接收和发送的数据 while true do if not mqttInMsgProc(mqttClient) then log.error("mqttTest.mqttInMsgProc error") break end if not mqttOutMsgProc(mqttClient) then log.error("mqttTest.mqttOutMsgProc error") break end end end else log.info("mqttTest.mqttClient","connect fail")--连接失败 retryConnectCnt = retryConnectCnt+1 --失败次数加一 end --断开MQTT连接 mqttClient:disconnect() if retryConnectCnt>=5 then link.shut() retryConnectCnt=0 end sys.wait(5000) else --没连上网 --等待网络环境准备就绪,超时时间是5分钟 sys.waitUntil("IP_READY_IND",300000) --等完了还没连上? if not socket.isReady() then --进入飞行模式,20秒之后,退出飞行模式 net.switchFly(true) sys.wait(20000) net.switchFly(false) end end end end) ``` # 验证功能 我没条件测试这玩意儿。。。谁有空帮我测试下的?
发表于 2018-08-27 18:24
阅读 ( 11441 )
分类:
默认分类
9 推荐
打赏
收藏
你可能感兴趣的文章
11、RDA8910CSDK二次开发:新鲜出炉的MQTT库
8002 浏览
10、合宙Air模块Luat开发:JSON字符串的生成与解析
2430 浏览
9、合宙Air模块Luat开发:认识NVS数据管理模块
2104 浏览
8、合宙Air模块Luat开发:基于官方库的二次封装,使串口更加易用
2675 浏览
7、合宙Air模块Luat开发:定时器的使用方法
2266 浏览
X、合宙Air模块Luat开发:全网首发,通过iic直接驱动OLED,720Sl开始有显时代
2869 浏览
相关问题
Air202模块连接阿里云MQTT,如何用Lua计算password?
0 回答
怎么在MQTT SSL连接时的添加自己ca证书验证
1 回答
air202,用mqtt协议连接onenet平台(lua)。
1 回答
Air720H 怎样通过 MQTT服务发送图片?
1 回答
air 202的mqtt使用时,模块上报usr为MQTT HEART NO ACK,该怎么处理
1 回答
Air208编程
1 回答
2 条评论
请先
登录
后评论
晨旭
菜鸟
20 篇文章
作家榜
»
技术销售Delectate
43 文章
陈夏
26 文章
国梁
24 文章
miuser
21 文章
晨旭
20 文章
朱天华
19 文章
金艺
19 文章
杨奉武
18 文章
×
发送私信
发给:
内容:
×
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!