MQTT Global Shared Subscriptions In IBM Message Gateway (Amlen)
Before I start and in case you are not aware IBM open-sourced and donated “IBM Watson IoT Platform — Message Gateway” (previously called IBM MessageSight) to Eclipse in 2021 and it now sits under the Amlen project. Lots of great material on Amlen can be found here and it should be noted that IBM Message Gateway is now out of support. The work I cover here was performed on IBM Message Gateway but is completely applicable to Amlen as well.
As part of a project I’ve been working I needed to investigate “Global Shared Subscriptions” or GSS for short. GSS allows multiple subscribers to consume messages from a common topic but in such a way that only one subscriber receives any given published message. On top of this I need to be able to do this with multiple instances of Message Gateway running in side an OpenShift cluster. To support this my Message Gateway deployment looks as follows:
Here you can see that within my namespace I have two instances of Message Gateway running. Each instance has an aligned service and eash service is presented out as an HTTPS route to allow my NodeRED instances to access it. On top of this I have an OpenLDAP instance which is used to control access to to topics within my Message Gateway environment. Finally my Message Gateway pods are running within a “Cluster”.
Within Message Gateway I have a Hub (Verdi) defined…
On top of this I have a connection policy defined (VerdiDemo) to restrict access to a user call “backend”.
Next I have a subscription policy and this is where the “GSS” kicks in. Before I describe what I have a bit of backgroud re how things work. For a subscriber to use a “GSS” they must subscribe using a topic string that starts with either $SharedSubscription
or $share
. The first option $SharedSubscription
is a IBM Message Gateway specific approach and so is less portable. Based on this I used the $share
method. With either method the initial part of the topic string that follows $SharedSubscription
or $share
indicates the subname for the “GSS” and this must be matched by the Subscription Policy. In my test case the topic I wanted to share was nft/push
so to set up the “GSS” I would need to subscribe to a topic of the format $share/<gss sub name>/nft/push
in my case I decided to set the <gss sub name>
to gss-push
. So with that explained here is what I defined in my subscription policy.
Here you can see that I’ve defined my subscription policy wider than just the sub topic and have set it to gss-push/nft
this allows me to have greater control over the topics and so can have different users allowed to access gss-push/dev
and gss-push/prod
from backend
who is allowed to access gss-push/nft
. The *
wild card is very important as this allows the GSS to match with a wider range of topic i.e. anything that starts $share/gss-push/nft
and in my case I am subscribing to $share/gss-push/nft/push
and publishing to ntf/push
. The final part of my configuration is a Topic policy. This is not strictly necessary but I have one so I can test out locking down access to topics. As this is not needed for my work here I set the topic to *
So with my Message Gateway instances configured I needed to look at creating my test harnesses in NodeRED. What I wanted to achieve was the following.
Here you can see that I have a single NodeRED pod running and publishing to both Message Gateway pods. I then have 2 sets of 3 NodeRED pods that are subscribing to the GSS $share/gss-push/nft/push
with each set connecting to a specific instance of Message Gateway. Finally I have a NodeRED instance running directly on my Mac to act as a logger so I can track what messages are going where. In terms of the flows that I created…
This is the general subscriber flow.
For each instance of this flow the Create Payload
function node is updated to reflect where the MQTT message came from, which pod processed it and which message number it is.
Once the payload has been created the payload is published on a normal topic named mqtt_test/log
. The publisher flow is:
In the publisher flow I use a flow
variable to hold the message number and this is incremented for each message and is sent as part of the message payload. I have an Inject
node set up to initialise this when the flow starts and then I have four additional Inject
nodes to trigger sending messages. Each time a message if triggered I generate a random number and then use this in conjunction with a Switch
node to route the traffic to different Message Gateway instances (each MQTT Publish
node connects to a different Route
). The final step before publishing the message is to add the selected Message Gateway instance number into the payload.
The final part of the jigsaw is the “Logger” NodeRED instance. I run this directly on my Mac as I use this flow to externalise the data to my filesystem so I can analyse it. The NodeRED flow is:
This flow simple subscribes to the mqtt_test/log
topic which the subscriber flows are publishing to. For each received message the flow converts it to a CSV format and appends it to a csv file on my Mac’s disk.
With everything in place I could fire up the publisher and check everything was following. As expected in my csv log file (which I tail
‘d) I could see messages being processed once and only once across all subscriber instances. If I killed subscriber pods then I could see they no longer appeared as a processor in the log file but the complete set of messages was still processed.
So in conclusion I successfully set up a “Global-shared subscription” and tested publishing and subscribing to it with more than one Message Gateway instance and more than one subscriber on each Message Gateway instance. Topic policies on ‘*’ are not ideal and should be avoided but in my test case I was keeping things simple. Also I’ve not dug into the various QoS (Quality of Service) options of messages or durability of subsscriptions etc… If you have persistent messages buffering for disconnected apps I would want to investigate HA pairs.