IotMqttSampleUsingQueue

Sample for MQTT communication via a message queue

This sample illustrates the communication with an MQTT broker. Messages are sent (publish mode) and received. Receiving of messages involves two steps. First, a general decision is made on which types of messages are to be received ("Subscribe"). Then, received messages are collected in a message queue, from where they can be read and evaluated.

Project structure

1. Create a TwinCAT project with a PLC and add Tc3_IotBase as library reference.
2. Create a program block and declare an instance of FB_IotMqttClient and two auxiliary variables to control the program sequence, if required.
PROGRAM PrgMqttCom
VAR
    fbMqttClient    : FB_IotMqttClient;
    bSetParameter   : BOOL := TRUE;
    bConnect        : BOOL := TRUE;
END_VAR
3. Declare two variables (for topic and payload) for the MQTT message to be sent. In the sample a message is to be sent every second.
    (* published message *)
    sTopicPub   : STRING(255) := 'MyTopic';
    sPayloadPub : STRING(255);
    i : UDINT;
    fbTimer : TON := (PT:=T#1S);
4. For each message receive operation a variable containing the topic to be received should be declared, plus two further variables indicating the topic and payload of the last received message.
The received messages are to be collected in a queue for subsequent evaluation on a one-by-one basis. To this end, you should declare an instance of FB_IotMqttMessageQueue and an instance of FB_IotMqttMessage.
    (* received message *)
    bSubscribed    : BOOL;
    sTopicSub      : STRING(255) := 'MyTopic';
    {attribute 'TcEncoding':='UTF-8'}
    sTopicRcv      : STRING(255);
    {attribute 'TcEncoding':='UTF-8'}
    sPayloadRcv    : STRING(255);
    fbMessageQueue : FB_IotMqttMessageQueue;
    fbMessage      : FB_IotMqttMessage;
5. In the program part, the MQTT client must be triggered cyclically, in order to ensure that a connection to the broker is established and maintained and the message is received. Set the parameters of the desired connection and initialize the connection with the transfer parameter bConnect := TRUE.
In the sample the parameters are assigned once in the program code before the client call. Since this is usually only required once, the parameters can already be specified in the declaration part during instantiation of the MQTT client. Not all parameters have to be assigned.
In the sample the broker is local. The IP address or the name can also be specified.
IF bSetParameter THEN
    bSetParameter               := FALSE;
    fbMqttClient.sHostName      := 'localhost';
    fbMqttClient.nHostPort      := 1883;
//  fbMqttClient.sClientId      := 'MyTcMqttClient'; 
    fbMqttClient.sTopicPrefix   := ''; 
//  fbMqttClient.nKeepAlive     := 60; 
//  fbMqttClient.sUserName      := ;
//  fbMqttClient.sUserPassword  := ; 
//  fbMqttClient.stWill         := ; 
//  fbMqttClient.stTLS          := ;
    fbMqttClient.ipMessageQueue := fbMessageQueue;
END_IF

fbMqttClient.Execute(bConnect);
6. As soon as the connection to the broker is established, the client should subscribe to a particular topic. A message should be sent every second.
In the sample sTopicPub = sTopicSub applies, so that a loop-back occurs. In other applications the topics usually differ.
IF fbMqttClient.bConnected THEN
    IF NOT bSubscribed THEN
        bSubscribed := fbMqttClient.Subscribe(sTopic:=sTopicSub, eQoS:=TcIotMqttQos.AtMostOnceDelivery);
    END_IF
    fbTimer(IN:=TRUE);
    IF fbTimer.Q THEN // publish new payload every second
        fbTimer(IN:=FALSE);
        i := i + 1;
        sPayloadPub := CONCAT('MyMessage', TO_STRING(i));
        fbMqttClient.Publish(    sTopic:= sTopicPub, 
                                pPayload:= ADR(sPayloadPub), nPayloadSize:= LEN2(ADR(sPayloadPub))+1, 
                                eQoS:= TcIotMqttQos.AtMostOnceDelivery, bRetain:= FALSE, bQueue:= FALSE );
    END_IF
END_IF
7. The cyclic call of the MQTT client ensures that the messages are received. The client receives all messages with topics to which it has previously subscribed with the broker and places them in the message queue. Once messages are available, call the method Dequeue() to gain access to the message properties such as topic or payload via the message object fbMessage.
IF fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        fbMessage.GetTopic(pTopic:=ADR(sTopicRcv), nTopicSize:=SIZEOF(sTopicRcv) );
        fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv), nPayloadSize:=SIZEOF(sPayloadRcv), bSetNullTermination:=FALSE);
    END_IF
END_IF

If message evaluation is implemented as described above, one received message is evaluated per cycle. If several messages were accumulated in the message queue, the evaluation is distributed over several cycles.

The sample can be modified for applications in which subscriptions to several topics exist. In this case MQTT messages with different topics are received. Message evaluation can be expanded as follows:

VAR
    (* received payload for each subscribed topic *)
    sPayloadRcv1 : STRING(255);
    sPayloadRcv2 : STRING(255);
END_VAR
VAR CONSTANT
    (* subscriptions *)
    sTopicSub1 : STRING(255) := 'my first topic';
    sTopicSub2 : STRING(255) := 'my second topic'; 
END_VAR
----------------------------------------------------------------

IF fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        IF fbMessage.CompareTopic(sTopic:=sTopicSub1) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv1), nPayloadSize:=SIZEOF(sPayloadRcv1), bSetNullTermination:=FALSE);
        ELSIF fbMessage.CompareTopic(sTopic:=sTopicSub2) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv2), nPayloadSize:=SIZEOF(sPayloadRcv2), bSetNullTermination:=FALSE);
        END_IF
    END_IF
END_IF

Requirements

Development environment

Target platform

PLC libraries to include

TwinCAT v3.1.4022.0

IPC or CX (x86, x64, ARM)

Tc3_IotBase,
Tc2_Utilities (>= v3.3.19.0)