IotMqttSampleUsingQueue

Beispiel zur MQTT-Kommunikation mithilfe einer Message-Queue

In diesem Beispiel wird die Kommunikation zu einem MQTT Broker dargestellt. Es werden Nachrichten verschickt („Publish“) und empfangen. Dies erfolgt in zwei Schritten. Zuerst wird entschieden, welche Nachrichten generell empfangen werden sollen („Subscribe“). Anschließend werden empfangene Nachrichten in einer Message Queue eingesammelt, um von hier ausgelesen und ausgewertet werden zu können.

Projektstruktur

1. Erstellen Sie ein TwinCAT-Projekt mit einer SPS und fügen Sie die Tc3_IotBase als Bibliotheksreferenz an.
2. Legen Sie einen Programmbaustein an und deklarieren Sie eine Instanz von FB_IotMqttClient sowie zwei Hilfsvariablen, um den Programmablauf bei Bedarf steuern zu können.
PROGRAM PrgMqttCom
VAR
    fbMqttClient    : FB_IotMqttClient;
    bSetParameter   : BOOL := TRUE;
    bConnect        : BOOL := TRUE;
END_VAR
3. Deklarieren Sie für die zu verschickende MQTT-Nachricht zwei Variablen für Topic und Payload. Im Beispiel soll jede Sekunde eine Nachricht verschickt werden.
    (* published message *)
    sTopicPub   : STRING(255) := 'MyTopic';
    sPayloadPub : STRING(255);
    i : UDINT;
    fbTimer : TON := (PT:=T#1S);
4. Deklarieren Sie für den Nachrichtenempfang eine Variable, die das zu empfangene Topic enthält und zwei weitere Variablen, die Topic und Payload der zuletzt empfangenen Nachricht anzeigen.
Die empfangenen Nachrichten sollen in einer Warteschlange (Queue) gesammelt werden, um eine nach der anderen ausgewertet werden zu können. Hierfür deklarieren Sie eine Instanz von FB_IotMqttMessageQueue sowie eine Instanz von FB_IotMqttMessage.
    (* received message *)
    bSubscribed    : BOOL;
    sTopicSub      : STRING(255) := 'MyTopic';
    {attribute 'TcEncoding':='UTF-8'}
    sTopicRcv      : STRING(255);
    {attribute 'TcEncoding':='UTF-8'}
    sPayloadRcv    : STRING(255);
    fbMessageQueue : FB_IotMqttMessageQueue;
    fbMessage      : FB_IotMqttMessage;
5. Im Programmteil muss der MQTT Client zyklisch getriggert werden, um den Verbindungsaufbau zum Broker, den Verbindungserhalt und den Nachrichtenempfang zu gewährleisten. Setzen Sie die Parameter der gewünschten Verbindung und initialisieren Sie den Verbindungsaufbau mit dem Übergabeparameter bConnect := TRUE.
Im Beispiel werden die Parameter einmalig vor dem Client-Aufruf im Programmcode zugewiesen. Weil dies meist nur einmalig nötig ist, können die Parameter auch bereits im Deklarationsteil bei der Instanziierung des MQTT Client angegeben werden. Nicht alle Parameter müssen zugewiesen werden.
Im Beispiel ist der Broker lokal. Sie können auch die IP-Adresse oder den Namen angeben.
IF bSetParameter THEN
    bSetParameter               := FALSE;
    fbMqttClient.sHostName      := 'localhost';
    fbMqttClient.nHostPort      := 1883;
//  fbMqttClient.sClientId      := 'MyTcMqttClient'; 
    fbMqttClient.sTopicPrefix   := ''; 
//  fbMqttClient.nKeepAlive     := 60; 
//  fbMqttClient.sUserName      := ;
//  fbMqttClient.sUserPassword  := ; 
//  fbMqttClient.stWill         := ; 
//  fbMqttClient.stTLS          := ;
    fbMqttClient.ipMessageQueue := fbMessageQueue;
END_IF

fbMqttClient.Execute(bConnect);
6. Sobald die Verbindung zum Broker aufgebaut wird, soll sich der Client auf ein bestimmtes Topic anmelden. Ebenso soll jede Sekunde eine Nachricht versandt werden.
Im Beispiel ist sTopicPub = sTopicSub, sodass ein Loopback entsteht. In anderen Applikationen unterscheiden sich die Topics typischerweise.
IF fbMqttClient.bConnected THEN
    IF NOT bSubscribed THEN
        bSubscribed := fbMqttClient.Subscribe(sTopic:=sTopicSub, eQoS:=TcIotMqttQos.AtMostOnceDelivery);
    END_IF
    fbTimer(IN:=TRUE);
    IF fbTimer.Q THEN // publish new payload every second
        fbTimer(IN:=FALSE);
        i := i + 1;
        sPayloadPub := CONCAT('MyMessage', TO_STRING(i));
        fbMqttClient.Publish(    sTopic:= sTopicPub, 
                                pPayload:= ADR(sPayloadPub), nPayloadSize:= LEN2(ADR(sPayloadPub))+1, 
                                eQoS:= TcIotMqttQos.AtMostOnceDelivery, bRetain:= FALSE, bQueue:= FALSE );
    END_IF
END_IF
7. Der zyklische Aufruf vom MQTT Client sorgt für den Empfang der Nachrichten. Der Client empfängt alle Nachrichten, auf deren Topic er sich zuvor beim Broker angemeldet hat und legt diese in der Message Queue ab. Sobald Nachrichten verfügbar sind, rufen Sie die Methode Dequeue() auf und erhalten über das Nachrichtenobjekt fbMessage Zugriff auf die Nachrichteneigenschaften wie Topic und Payload.
IF fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        fbMessage.GetTopic(pTopic:=ADR(sTopicRcv), nTopicSize:=SIZEOF(sTopicRcv) );
        fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv), nPayloadSize:=SIZEOF(sPayloadRcv), bSetNullTermination:=FALSE);
    END_IF
END_IF

Bei obiger Implementierung der Nachrichtenauswertung wird pro Zyklus eine empfangene Nachricht ausgewertet. Falls mehrere Nachrichten in der Message-Queue gesammelt wurden, verteilt sich deren Auswertung entsprechend auf mehrere Zyklen.

Das Beispiel kann abgewandelt werden für Applikationen, in denen sich auf mehrere Topics anmeldet wird. Dann werden MQTT-Nachrichten mit verschiedenen Topics empfangen. Die Auswertung der Nachrichten könnten Sie wie folgt für diesen Anwendungsfall erweitern:

VAR
    (* received payload for each subscribed topic *)
    sPayloadRcv1 : STRING(255);
    sPayloadRcv2 : STRING(255);
END_VAR
VAR CONSTANT
    (* subscriptions *)
    sTopicSub1 : STRING(255) := 'my first topic';
    sTopicSub2 : STRING(255) := 'my second topic'; 
END_VAR
----------------------------------------------------------------

IF fbMessageQueue.nQueuedMessages > 0 THEN
    IF fbMessageQueue.Dequeue(fbMessage:=fbMessage) THEN
        IF fbMessage.CompareTopic(sTopic:=sTopicSub1) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv1), nPayloadSize:=SIZEOF(sPayloadRcv1), bSetNullTermination:=FALSE);
        ELSIF fbMessage.CompareTopic(sTopic:=sTopicSub2) THEN
            fbMessage.GetPayload(pPayload:=ADR(sPayloadRcv2), nPayloadSize:=SIZEOF(sPayloadRcv2), bSetNullTermination:=FALSE);
        END_IF
    END_IF
END_IF

Voraussetzungen

Entwicklungsumgebung

Zielplattform

Einzubindende SPS-Bibliotheken

TwinCAT v3.1.4022.0

IPC oder CX (x86, x64, ARM)

Tc3_IotBase,
Tc2_Utilities (>= v3.3.19.0)