IotMqttv5Sample

Sample for MQTTv5 communication via a message queue

In this sample, the communication to an MQTT broker using MQTTv5 is shown. As a prerequisite, the message broker used must support MQTTv5. Messages are sent ("Publish") and received ("Subscribe"). This is done in two steps. First, the topic is used to decide which messages are to be received. Then, the received messages are collected in a message queue, from where they can be read and evaluated. The sample sends and receives messages on the same topic for demonstration purposes.

IotMqttv5Sample 1:

Handling the MessageQueue

In contrast to the FB_IotMqttClient function block for MQTTv3, in the FB_IotMqtt5Client function block you no longer have to instantiate the message queue separately as an object and link it to the function block. Instead, you can directly access the corresponding output of the function block to work with the message queue.

Project structure

1. Create a TwinCAT project with a PLC and attach the Tc3_IotBase and Tc3_JsonXml as library references.
2. Create a program block and declare an instance of FB_IotMqtt5ClientBase and two auxiliary variables to control the program sequence, if required.
PROGRAM PrgMqttCom
VAR
    fbMqttClient    : FB_IotMqtt5Client;
    bSetParameter   : BOOL := TRUE;
    bConnect        : BOOL := TRUE;
END_VAR
3. Declare two variables (for topic and payload) for the MQTT message to be sent. In the sample a message is to be sent every second. Three imaginary sensor values are used for the message, which are to be packaged into a JSON document via the F_CreateMessage function, which is yet to be created.
    (* published message *)
    sTopicPub : STRING(255) := 'MyTopic';
    sPayloadPub : STRING(255);
    fbTimer : TON := (PT:=T#1S);
    rSensor1 : REAL;
    nSensor2 : DINT;
    bSensor3 : BOOL;
4. For message reception, declare a variable containing the topic to which the client should subscribe and two other variables indicating the topic and payload of the last received message.
The received messages should be collected in a queue to be evaluated one after the other. For this you declare an instance of FB_IotMqtt5MessageQueue and an instance of FB_IotMqtt5Message.
    (* received message *)
    bSubscribed    : BOOL;
    sTopicSub      : STRING(255) := 'MyTopic';
    {attribute 'TcEncoding':='UTF-8'}
    sTopicRcv      : STRING(255);
    {attribute 'TcEncoding':='UTF-8'}
    sPayloadRcv    : STRING(255);
    fbMessage      : FB_IotMqtt5Message;
5. In the program part, the MQTT client must be triggered cyclically, in order to ensure that a connection to the broker is established and maintained and the message is received. Set the parameters of the desired connection and initialize the connection with the transfer parameter bConnect := TRUE.
In the sample the parameters are assigned once in the program code before the client call. Since this is usually only required once, the parameters can already be specified in the declaration part during instantiation of the MQTT client. Not all parameters have to be assigned.
In the sample the broker is local. The IP address or the name can also be specified.
IF bSetParameter THEN
    bSetParameter := FALSE;
    fbMqttClient.sHostName := 'localhost';
    fbMqttClient.nHostPort := 1883;
END_IF

fbMqttClient.Execute(bConnect);
6. As soon as the connection to the broker is established, the client should subscribe to a particular topic. Likewise, a message is to be sent every second.
In the sample sTopicPub = sTopicSub, so that a loopback is created. In other applications the topics usually differ.
IF fbMqttClient.bConnected THEN
    IF NOT bSubscribed THEN
        bSubscribed := fbMqttClient.Subscribe(sTopic:=sTopicSub, eQoS:=TcIotMqttQos.AtMostOnceDelivery);
    END_IF
    fbTimer(IN:=TRUE);
    IF fbTimer.Q THEN // publish new payload every second
        fbTimer(IN:=FALSE);
        rSensor1 := rSensor1 + 0.1;
        nSensor2 := nSensor2 + 1;
        bSensor3 := NOT bSensor3;
        sPayloadPub := F_CreateMessage(rSensor1, nSensor2, bSensor3);
        fbMqttClient.Publish(sTopic:= sTopicPub, pPayload:= ADR(sPayloadPub), nPayloadSize:= LEN2(ADR(sPayloadPub))+1, eQoS:= TcIotMqttQos.AtMostOnceDelivery, bRetain:= FALSE, bQueue:= FALSE );
    END_IF
END_IF
7. The cyclic call of the MQTT client ensures that the messages are received. The client receives all messages with topics to which it has previously subscribed with the broker and places them in the message queue. Once messages are available, call the method Dequeue() to gain access to the message properties such as topic or payload via the message object fbMessage.
IF fbMqttClient.fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMqttClient.fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        fbMessage.GetTopic(pTopic:=ADR(sTopicRcv), nTopicSize:=SIZEOF(sTopicRcv));
        fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv), nPayloadSize:=SIZEOF(sPayloadRcv), bSetNullTermination:=FALSE);
    END_IF
END_IF
8. If message evaluation is implemented as described above, one received message is evaluated per cycle. If several messages were accumulated in the message queue, the evaluation is distributed over several cycles.
9. Finally, add a new function called F_CreateMessage. The function shall have the return value STRING(255) and shall be provided with the following function signature:
FUNCTION F_CreateMessage : STRING(255)
VAR_INPUT
  Sensor1 : REAL;
  Sensor2 : DINT;
  Sensor3 : BOOL;
END_VAR
VAR
  dtTimestamp : DATE_AND_TIME;
  timeAsFileTime : T_FILETIME64;
  fbJson : FB_JsonSaxWriter;
END_VAR
10. The function first generates a timestamp based on the F_GetSystemTime function and then packages the passed sensor values and timestamp into a JSON document.
timeAsFileTime := F_GetSystemTime();
dtTimestamp := FILETIME64_TO_DT( timeAsFileTime );
fbJson.StartObject();
fbJson.AddKey('Timestamp');
fbJson.AddDateTime(dtTimestamp);
fbJson.AddKey('Values');
fbJson.StartObject();
fbJson.AddKey('Sensor1');
fbJson.AddReal(Sensor1);
fbJson.AddKey('Sensor2');
fbJson.AddDint(Sensor2);
fbJson.AddKey('Sensor3');
fbJson.AddBool(Sensor3);
fbJson.EndObject();
fbJson.EndObject();
F_CreateMessage := fbJson.GetDocument();
fbJson.ResetDocument();

Further steps

The sample can be modified for applications in which subscriptions to several topics exist. In this case MQTT messages with different topics are received. Message evaluation can be expanded as follows:

VAR
    (* received payload for each subscribed topic *)
    sPayloadRcv1 : STRING(255);
    sPayloadRcv2 : STRING(255);
END_VAR
VAR CONSTANT
    (* subscriptions *)
    sTopicSub1 : STRING(255) := 'my first topic';
    sTopicSub2 : STRING(255) := 'my second topic'; 
END_VAR
----------------------------------------------------------------

IF fbMqttClient.fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMqttClient.fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        IF fbMessage.CompareTopic(sTopic:=sTopicSub1) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv1), nPayloadSize:=SIZEOF(sPayloadRcv1), bSetNullTermination:=FALSE);
        ELSIF fbMessage.CompareTopic(sTopic:=sTopicSub2) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv2), nPayloadSize:=SIZEOF(sPayloadRcv2), bSetNullTermination:=FALSE);
        END_IF
    END_IF
END_IF

Requirements

Development environment

Target platform

PLC libraries to include

TwinCAT v3.1.4026.0

IPC or CX (x86, x64, ARM)

Tc3_IotBase (>= v3.4.2.0),
Tc2_Utilities (>= v3.4.4.0)