Batching Messages With MQTT
Its been a while and I needed another MQTT “technology fix” to get my brain whirling so I revisited a challenge I looked at several years back — how to “shoulder tap” an MQTT device that had gone silent. For example I may have an application on a web page which a human is interacting with and this is driving MQTT Messages. If the message flow stops when I would expect it to continue I may want to “shoulder tap” the human via another channel (SMS, Email etc) to try and “engage” them to complete the activity.
So scoping the challenge a bit more I created the following requirement:
“Detect and highlighted the loss of device communication and after a the device has been quite for 45–60mins try and restart communications”
Digging into this at the technical level I could see…
- I would have multiple devices flagged as being “inactive”
- Devices could go inactive at anytime but I don’t want to action them unless they have been inactive for between 45–60mins
- Once I’ve actioned a device I don’t want to action it again unless it goes quite for another 45–60mins
I then looked at what I could do with MQTT…
- I can only have one message per topic when using Quality of Service (QoS) 0
- If a client is connected and subscribed to a topic any messages sent are immediately received
- Retained messages will always be picked up unless the they are cleared from a topic
- Quality of Service (QoS) level 2 makes sure that only one copy of a message is received but requires a durable subscription tied to a client id (which is done in by setting the “clean session” flag to false).
- Where a durable subscription is in place, when the client with the associated client id reconnects held messages will flow.
With all this information I decided that I would need to have a way to split out the devices to separate topics. In addition these topics would need to carry information about the time the message was added. In terms of handling the messages I had two options:
- Use “retained” messages and QoS 0 for the publication of the “inactive” device and in the message processor clear the message once it had been received.
- Use QoS 2 and use distinct client id’s to allow messages for a given time period to be retrieved.
First things first, I needed to define my topic structure. As indicated above this needed to support multiple devices being marked as “inactive” as well as an indication of “when” the were detected as being “inactive”. I also needed to allow the application that would “sweep” for messages to be able to use wildcards to access all messages for all devices that had been marked “inactive” for a 45–60mins. Based on this I decided on the following topic structure.
/reengage/<client id>/<time period>
<time period> is a number representing 15mins period of hour when the message was publish and
<client id> is the client id used by the device to connect to the MQTT broker. The reason I put
<client id> first was for two reasons:
- So that I could use security features on my broker (I am using IBM Message Gateway) to lock down the ability to publish to the topic.
- To support a device removing an inactive message should it become active of its own accord
So this would lead me to a topic tree which would have a branch for each device which had detected “inactivity” and then under this there would be a branch for the “time period” that the “inactivity” was detected.
In both solution options above the device client will be publishing messages to the topic above but in the case of the 1st option it will be using QoS 0 and the “message retained” flag set to “true”. Where as in the 2nd options it will use QoS 2 and the “message retained” flag set to “false”. When publishing the message the device will use the current time to figure out what minute within the hour it is in and then map that to a “time period”. For my use case I simple mapped each 15mins block to “time periods” 0–3 with 0 for 1st 15mins and 3 for the last 15mins. On top of this processing I would need to look at implementing a way to clear the “inactive” message if/when the device starts sending messages. In the case of QoS 0 this would simply be a case of subscribing to the topic
/reengage/<devices client id>/#. This would pick up any “inactive” messages and I could use the topic string in the
callback to publish a
null message. The QoS 2 option, however , is more complicated. In this case messages are held in the durable subscription so I can’t over write them. I also can’t create a durable subscription to the
/reengage topic as that would mean my device would get the message and NOT the “sweeper”. The only approach I could think of is to have a 2nd topic
/reactivated where I would publish the client id of any device that “reactivated” its self but this would require the device to maintain a level of state information. The best option I could come up with was to use topic with QoS 0 and “retained” messages to maintain this state for the device. In this case only the device would access the topic and it would have a topic string of
/reengage/<client id>/state. All in all the QoS 2 option is looking less attractive.
To allow me to test my approach I created the following NodeRED flow to publish messages to the
/reactivate/<client id>/<time period> topic.
The inject node is set to repeat every minute so I can build up a “batch” of messages for each “time period”. The “inactive” message I decided to sent was the following JSON Object.
I need to dynamically set the target topic string and this is done in the “Set Topic” node.
When testing the two different QoS approaches I simply change the MQTT publish node to reflect the necessary “QoS” and “Retain” settings.
The focus now shifts to the “sweeper application” and again there are some key differences in the way I need to handle the processing. In both cases the “sweeper application” connects regularly, retrieves the messages for the devices that need to be “shoulder tapped” and then disconnects. The approaches to performing this are quite different though.
Lets first look at the QoS 0 approach. In this case I am are relying on the MQTT brokers ability to “retain” the last published message on a topic. This means that when a client connects and subscribes to the topic they get the last message that was published. This is true even if its the same client which has unsubscribed, disconnected then reconnected and re-subscribed; it still gets a copy of the message. To stop this happening we need to flush out the “retained” message and this is done by publishing a
null message to the topic. When I did this in my “sweeper” NodeRED flow this is a publish to the topic so the subscription is triggered. This meant that I needed to do some processing check on received messages to make sure I actually had one (It may be that other MQTT client implementations handle
null messages and don’t trigger the
callback but I suspect their behaviour is pretty standard). Given I want to process all messages in the “time period” which would cover devices that have been “inactive” for 45–60mins my “sweeper” program needs to kick in every 15mins. When it kicks in it figures out the current “time period” and determines the pervious “period” to sweep. To do the “sweep” the application executes the following steps:
- Connect to MQTT Broker
- Subscribe to topic
- For each received valid message take action to “shoulder tap”, publish
nullmessage to topic
- Unsubscribe from topic
- Disconnect from MQTT Broker
The NodeRED Flow I created for the QoS 0 “sweeper” is.
As I need to adjust the topic string based on the “time period” I am using “Dynamic subscription” in my MQTT subscriber node. The inject nodes allow me to control the cycle of connect, subscribe, unsubscribe and disconnect so I can simulate the “sweeping” process. The “Set Topic” is similar to the device one above but figures out the 45–60min “time period” to look at based on the current time.
This sets the topic to “wildcard” the
<client id>element so all messages published by all devices for a given “time period” are retrieved. My flow stores the retrieved messages so I can check I am getting everything and clears out the retained message on each topic I receive a message on. You can also see I have a switch node to ignore any “empty” messages which are ones where I have cleared the retained message.
Now lets focus on the QoS 2 approach. In this case we are relying on the MQTT brokers ability publish message to a subscriber of topic once and only once. This means that a client connects to the broker with “clean session” set to
false and subscribes to a topic. This creates a “durable” subscription for that client on that topic (NB: the topic can contain wildcards) and this remains in place until the client unsubscribes from the topic. For this to work the durable subscription needs to be in place before any messages are published otherwise the broker will just discard them (as no one listening). To do this I just need to connect to the broker with a defined client id (this is key as durable subscriptions are tied to the client id) and then subscribe to the following topics
This creates a durable subscription for each “time period” for all client ids. Once these are set I just disconnect the client as unsubscribing would remove the durable subscription. With everything set up the sweeping process is very similar as the NodeRED flow is:
The bottom of the flow is all about setting up the durable subscriptions. The inject nodes are set up to fire at flow start with connect happening 1st then a delay and all the subscribes firing at once then finally a bit later disconnect fires. If I look on my Message Gateway Web interface I can see the durable subscriptions.
Once all the subscriptions are in place the steps to “sweep” are slightly different.
- Connect to MQTT Broker with client id used to subscribe to the time period to “sweep”
- For each received valid message take action to “shoulder tap”
- Disconnect from MQTT Broker
I don’t need to take any steps to remove the messages from the topics as the QoS 2 setting ensures that messages are only delivered once and then removed.
I had a LOT of fun digging around this challenge and have learn a lot more about MQTT and in particular QoS 2. Based on my experience I would say that for my use case the best option is to use the QoS 0 and retained message route. This adds some complexity around clearing the retained messages but simplifies the “clearing” of a message should a device reactivate its self before the “sweeper” kicks in.