IotMqttv5Sample

Beispiel zur MQTTv5-Kommunikation mithilfe einer Message-Queue

In diesem Beispiel wird die Kommunikation zu einem MQTT Broker unter Verwendung von MQTTv5 dargestellt. Als Voraussetzung muss der verwendete Message Broker MQTTv5 unterstützen. Es werden Nachrichten verschickt („Publish“) und empfangen („Subscribe“). Dies erfolgt in zwei Schritten. Zuerst wird anhand des Topics entschieden, welche Nachrichten empfangen werden sollen. Anschließend werden die empfangenen Nachrichten in einer Message Queue eingesammelt, um von hier aus gelesen und ausgewertet werden zu können. Das Sample verschickt und empfängt zu Demonstrationszwecken Nachrichten auf demselben Topic.

IotMqttv5Sample 1:

Handhabung der MessageQueue

Im Gegensatz zum FB_IotMqttClient Baustein für MQTTv3, müssen Sie im FB_IotMqtt5Client Baustein die Message Queue nicht mehr separat als Objekt instanziieren und mit dem Baustein verknüpfen. Anstelle dessen können Sie direkt auf den entsprechenden Ausgang des Bausteins zugreifen um mit der Message Queue zu arbeiten.

Projektstruktur

1. Erstellen Sie ein TwinCAT-Projekt mit einer SPS und fügen Sie die Tc3_IotBase und Tc3_JsonXml als Bibliotheksreferenzen an.
2. Legen Sie einen Programmbaustein an und deklarieren Sie eine Instanz von FB_IotMqtt5ClientBase sowie zwei Hilfsvariablen, um den Programmablauf bei Bedarf steuern zu können.
PROGRAM PrgMqttCom
VAR
    fbMqttClient    : FB_IotMqtt5Client;
    bSetParameter   : BOOL := TRUE;
    bConnect        : BOOL := TRUE;
END_VAR
3. Deklarieren Sie für die zu verschickende MQTT-Nachricht zwei Variablen für Topic und Payload. Im Beispiel soll jede Sekunde eine Nachricht verschickt werden. Für die Nachricht werden drei imaginäre Sensorwerte verwendet, welche über die noch zu erstellende Funktion F_CreateMessage in ein JSON Dokument verpackt werden sollen.
    (* published message *)
    sTopicPub : STRING(255) := 'MyTopic';
    sPayloadPub : STRING(255);
    fbTimer : TON := (PT:=T#1S);
    rSensor1 : REAL;
    nSensor2 : DINT;
    bSensor3 : BOOL;
4. Deklarieren Sie für den Nachrichtenempfang eine Variable, die das Topic enthält, auf das sich der Client subscriben soll und zwei weitere Variablen, die Topic und Payload der zuletzt empfangenen Nachricht anzeigen.
Die empfangenen Nachrichten sollen in einer Warteschlange (Queue) gesammelt werden, um eine nach der anderen ausgewertet werden zu können. Hierfür deklarieren Sie eine Instanz von FB_IotMqtt5MessageQueue sowie eine Instanz von FB_IotMqtt5Message.
    (* received message *)
    bSubscribed    : BOOL;
    sTopicSub      : STRING(255) := 'MyTopic';
    {attribute 'TcEncoding':='UTF-8'}
    sTopicRcv      : STRING(255);
    {attribute 'TcEncoding':='UTF-8'}
    sPayloadRcv    : STRING(255);
    fbMessage      : FB_IotMqtt5Message;
5. Im Programmteil muss der MQTT Client zyklisch getriggert werden, um den Verbindungsaufbau zum Broker, den Verbindungserhalt und den Nachrichtenempfang zu gewährleisten. Setzen Sie die Parameter der gewünschten Verbindung und initialisieren Sie den Verbindungsaufbau mit dem Übergabeparameter bConnect := TRUE.
Im Beispiel werden die Parameter einmalig vor dem Client-Aufruf im Programmcode zugewiesen. Weil dies meist nur einmalig nötig ist, können die Parameter auch bereits im Deklarationsteil bei der Instanziierung des MQTT Client angegeben werden. Nicht alle Parameter müssen zugewiesen werden.
Im Beispiel ist der Broker lokal. Sie können auch die IP-Adresse oder den Namen angeben.
IF bSetParameter THEN
    bSetParameter := FALSE;
    fbMqttClient.sHostName := 'localhost';
    fbMqttClient.nHostPort := 1883;
END_IF

fbMqttClient.Execute(bConnect);
6. Sobald die Verbindung zum Broker aufgebaut wird, soll sich der Client auf ein bestimmtes Topic anmelden. Ebenso soll jede Sekunde eine Nachricht versandt werden.
Im Beispiel ist sTopicPub = sTopicSub, sodass ein Loopback entsteht. In anderen Applikationen unterscheiden sich die Topics typischerweise.
IF fbMqttClient.bConnected THEN
    IF NOT bSubscribed THEN
        bSubscribed := fbMqttClient.Subscribe(sTopic:=sTopicSub, eQoS:=TcIotMqttQos.AtMostOnceDelivery);
    END_IF
    fbTimer(IN:=TRUE);
    IF fbTimer.Q THEN // publish new payload every second
        fbTimer(IN:=FALSE);
        rSensor1 := rSensor1 + 0.1;
        nSensor2 := nSensor2 + 1;
        bSensor3 := NOT bSensor3;
        sPayloadPub := F_CreateMessage(rSensor1, nSensor2, bSensor3);
        fbMqttClient.Publish(sTopic:= sTopicPub, pPayload:= ADR(sPayloadPub), nPayloadSize:= LEN2(ADR(sPayloadPub))+1, eQoS:= TcIotMqttQos.AtMostOnceDelivery, bRetain:= FALSE, bQueue:= FALSE );
    END_IF
END_IF
7. Der zyklische Aufruf vom MQTT Client sorgt für den Empfang der Nachrichten. Der Client empfängt alle Nachrichten, auf deren Topic er sich zuvor beim Broker angemeldet hat und legt diese in der Message Queue ab. Sobald Nachrichten verfügbar sind, rufen Sie die Methode Dequeue() auf und erhalten über das Nachrichtenobjekt fbMessage Zugriff auf die Nachrichteneigenschaften wie Topic und Payload.
IF fbMqttClient.fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMqttClient.fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        fbMessage.GetTopic(pTopic:=ADR(sTopicRcv), nTopicSize:=SIZEOF(sTopicRcv));
        fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv), nPayloadSize:=SIZEOF(sPayloadRcv), bSetNullTermination:=FALSE);
    END_IF
END_IF
8. Bei obiger Implementierung der Nachrichtenauswertung wird pro Zyklus eine empfangene Nachricht ausgewertet. Falls mehrere Nachrichten in der Message-Queue gesammelt wurden, verteilt sich deren Auswertung entsprechend auf mehrere Zyklen.
9. Zum Schluss fügen Sie eine neue Funktion namens F_CreateMessage hinzu. Die Funktion soll den Rückgabewert STRING(255) besitzen und ist mit der folgenden Funktions-Signatur zu versehen:
FUNCTION F_CreateMessage : STRING(255)
VAR_INPUT
  Sensor1 : REAL;
  Sensor2 : DINT;
  Sensor3 : BOOL;
END_VAR
VAR
  dtTimestamp : DATE_AND_TIME;
  timeAsFileTime : T_FILETIME64;
  fbJson : FB_JsonSaxWriter;
END_VAR
10. Die Funktion erzeugt zunächst einen Zeitstempel basierend auf der Funktion F_GetSystemTime und verpackt anschließend die übergebenen Sensorwerte und den Zeitstempel in ein JSON Dokument.
timeAsFileTime := F_GetSystemTime();
dtTimestamp := FILETIME64_TO_DT( timeAsFileTime );
fbJson.StartObject();
fbJson.AddKey('Timestamp');
fbJson.AddDateTime(dtTimestamp);
fbJson.AddKey('Values');
fbJson.StartObject();
fbJson.AddKey('Sensor1');
fbJson.AddReal(Sensor1);
fbJson.AddKey('Sensor2');
fbJson.AddDint(Sensor2);
fbJson.AddKey('Sensor3');
fbJson.AddBool(Sensor3);
fbJson.EndObject();
fbJson.EndObject();
F_CreateMessage := fbJson.GetDocument();
fbJson.ResetDocument();

Weitere Schritte

Das Beispiel kann abgewandelt werden für Applikationen, in denen sich auf mehrere Topics angemeldet wird. Dann werden MQTT-Nachrichten mit verschiedenen Topics empfangen. Die Auswertung der Nachrichten könnten Sie wie folgt für diesen Anwendungsfall erweitern:

VAR
    (* received payload for each subscribed topic *)
    sPayloadRcv1 : STRING(255);
    sPayloadRcv2 : STRING(255);
END_VAR
VAR CONSTANT
    (* subscriptions *)
    sTopicSub1 : STRING(255) := 'my first topic';
    sTopicSub2 : STRING(255) := 'my second topic'; 
END_VAR
----------------------------------------------------------------

IF fbMqttClient.fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMqttClient.fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        IF fbMessage.CompareTopic(sTopic:=sTopicSub1) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv1), nPayloadSize:=SIZEOF(sPayloadRcv1), bSetNullTermination:=FALSE);
        ELSIF fbMessage.CompareTopic(sTopic:=sTopicSub2) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv2), nPayloadSize:=SIZEOF(sPayloadRcv2), bSetNullTermination:=FALSE);
        END_IF
    END_IF
END_IF

Voraussetzungen

Entwicklungsumgebung

Zielplattform

Einzubindende SPS-Bibliotheken

TwinCAT v3.1.4026.0

IPC oder CX (x86, x64, ARM)

Tc3_IotBase (>= v3.4.2.0),
Tc2_Utilities (>= v3.4.4.0)