MQTT Global Shared Subscriptions In IBM Message Gateway (Amlen)

Tony Hickman
6 min readOct 28, 2022

--

Before I start and in case you are not aware IBM open-sourced and donated “IBM Watson IoT Platform — Message Gateway” (previously called IBM MessageSight) to Eclipse in 2021 and it now sits under the Amlen project. Lots of great material on Amlen can be found here and it should be noted that IBM Message Gateway is now out of support. The work I cover here was performed on IBM Message Gateway but is completely applicable to Amlen as well.

As part of a project I’ve been working I needed to investigate “Global Shared Subscriptions” or GSS for short. GSS allows multiple subscribers to consume messages from a common topic but in such a way that only one subscriber receives any given published message. On top of this I need to be able to do this with multiple instances of Message Gateway running in side an OpenShift cluster. To support this my Message Gateway deployment looks as follows:

Message Gateway Deployment

Here you can see that within my namespace I have two instances of Message Gateway running. Each instance has an aligned service and eash service is presented out as an HTTPS route to allow my NodeRED instances to access it. On top of this I have an OpenLDAP instance which is used to control access to to topics within my Message Gateway environment. Finally my Message Gateway pods are running within a “Cluster”.

Within Message Gateway I have a Hub (Verdi) defined…

Message Hub

On top of this I have a connection policy defined (VerdiDemo) to restrict access to a user call “backend”.

Connection Policy

Next I have a subscription policy and this is where the “GSS” kicks in. Before I describe what I have a bit of backgroud re how things work. For a subscriber to use a “GSS” they must subscribe using a topic string that starts with either $SharedSubscription or $share . The first option $SharedSubscription is a IBM Message Gateway specific approach and so is less portable. Based on this I used the $share method. With either method the initial part of the topic string that follows $SharedSubscription or $share indicates the subname for the “GSS” and this must be matched by the Subscription Policy. In my test case the topic I wanted to share was nft/push so to set up the “GSS” I would need to subscribe to a topic of the format $share/<gss sub name>/nft/push in my case I decided to set the <gss sub name> to gss-push. So with that explained here is what I defined in my subscription policy.

Subscription Policy

Here you can see that I’ve defined my subscription policy wider than just the sub topic and have set it to gss-push/nft this allows me to have greater control over the topics and so can have different users allowed to access gss-push/dev and gss-push/prod from backend who is allowed to access gss-push/nft. The * wild card is very important as this allows the GSS to match with a wider range of topic i.e. anything that starts $share/gss-push/nft and in my case I am subscribing to $share/gss-push/nft/push and publishing to ntf/push . The final part of my configuration is a Topic policy. This is not strictly necessary but I have one so I can test out locking down access to topics. As this is not needed for my work here I set the topic to *

Topic Policy

So with my Message Gateway instances configured I needed to look at creating my test harnesses in NodeRED. What I wanted to achieve was the following.

Testing Approach

Here you can see that I have a single NodeRED pod running and publishing to both Message Gateway pods. I then have 2 sets of 3 NodeRED pods that are subscribing to the GSS $share/gss-push/nft/push with each set connecting to a specific instance of Message Gateway. Finally I have a NodeRED instance running directly on my Mac to act as a logger so I can track what messages are going where. In terms of the flows that I created…

This is the general subscriber flow.

Subscriber Flow

For each instance of this flow the Create Payload function node is updated to reflect where the MQTT message came from, which pod processed it and which message number it is.

Create Payload Function

Once the payload has been created the payload is published on a normal topic named mqtt_test/log. The publisher flow is:

Publisher Flow

In the publisher flow I use a flow variable to hold the message number and this is incremented for each message and is sent as part of the message payload. I have an Inject node set up to initialise this when the flow starts and then I have four additional Inject nodes to trigger sending messages. Each time a message if triggered I generate a random number and then use this in conjunction with a Switch node to route the traffic to different Message Gateway instances (each MQTT Publish node connects to a different Route). The final step before publishing the message is to add the selected Message Gateway instance number into the payload.

The final part of the jigsaw is the “Logger” NodeRED instance. I run this directly on my Mac as I use this flow to externalise the data to my filesystem so I can analyse it. The NodeRED flow is:

Logger Flow

This flow simple subscribes to the mqtt_test/log topic which the subscriber flows are publishing to. For each received message the flow converts it to a CSV format and appends it to a csv file on my Mac’s disk.

With everything in place I could fire up the publisher and check everything was following. As expected in my csv log file (which I tail ‘d) I could see messages being processed once and only once across all subscriber instances. If I killed subscriber pods then I could see they no longer appeared as a processor in the log file but the complete set of messages was still processed.

So in conclusion I successfully set up a “Global-shared subscription” and tested publishing and subscribing to it with more than one Message Gateway instance and more than one subscriber on each Message Gateway instance. Topic policies on ‘*’ are not ideal and should be avoided but in my test case I was keeping things simple. Also I’ve not dug into the various QoS (Quality of Service) options of messages or durability of subsscriptions etc… If you have persistent messages buffering for disconnected apps I would want to investigate HA pairs.

--

--

Tony Hickman
Tony Hickman

Written by Tony Hickman

I‘ve worked for IBM all of my career and am an avid technologist who is keen to get his hands dirty. My role affords me this opportunity and I share what I can

No responses yet