Get Topic Name on Talend Job From cMQTT Message

When a Talend route is built to subscribe to an MQTT broker, the message payload can be processed by a Talend Job passing over the message body. But you could also need to know the generating topic.

The Apache Camel cMQTT component stores the topic component as an additional message header named CamelMQTTSubscribeTopic. Hence inside Talend, using that header, we can build a schema which contains the message body and this specific header to be later processed (Apache Camel MQTT reference).

How to proceed. A a tRouteInput should be added to your Talend Job so you can receive messages from a route. The link between the route and the job is configured inside the route.

Talend job with a tRouteInput to get MQTT messages

The tRouteInput component is preset with a single schema field pre-populated with the expression ${in.body} which represent the input message body part.

We can then modify that schema adding a second column which references ${in.header.CamelMQTTSubscribeTopic} of String type. That field will contain the topic name and could be use to filter the flow or to just store the data source.

Modified Talend tRouteInput component to extract the topic name from a route message

That’s all, now we have a complete set of data to process.

Why not adding the client name in the body

You could be tempted to add, if possible, a client reference in the message body, specially when it is a JSON to which is easy to add a property. That opens doors to hacking, since there is no way to validate the field: anyone can publish a topic with that source.

Instead, brokers can be configured with authentication and publishing rules which limit publishers to write only in specific topics granting the message source.

Leave a Reply