Reactive Extensions and ADS

The Reactive Extensions (Rx) is a .NET library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

In ADS terms, not only the reading and writing data or symbol values can be put into reactive data streams, also ADS Notifications are a perfect fit for reactive code. This eases not only data binding to reactive frameworks (e.g. reactive UI) but also supports enhanced data manipulation via synchronous and asynchronous observers. Multithreaded and parallelized code paths that support multiple CPU cores can be written very easily without the burden of deadlock and synchronization issues.

More about .NET reactive extensions can be read here: Reactive extensions project site.

The TwinCAT ADS Reactive extensions are available via a supplement Nuget Package: Beckhoff.TwinCAT.Ads.Reactive Nuget Package

Example

Observe for Notifications

// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
    // Connect to target
    client.Connect(new AmsAddress(AmsNetId.Local, 851));

    // Reactive Notification Handler
    var valueObserver = Observer.Create<ushort>(val =>
    {
        Console.WriteLine(string.Format("Value: {0}", val.ToString()));
    }
    );

    // Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
    // and subscribe to them.
    IDisposable subscription = client.WhenNotification<ushort>("TwinCAT_SystemInfoVarList._TaskInfo.CycleCount", NotificationSettings.Default).Take(20).Subscribe(valueObserver);

    Console.ReadKey(); // Wait for Key press
    subscription.Dispose(); // Dispose the Subscription
}

Observe for Symbol Notifications

// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
    // Connect to target
    client.Connect(new AmsAddress(AmsNetId.Local, 851));

    // Create Symbol information
    var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
    IValueSymbol cycleCount = (IValueSymbol)symbolLoader.Symbols["TwinCAT_SystemInfoVarList._TaskInfo[1].CycleCount"];

    // Reactive Notification Handler
    var valueObserver = Observer.Create<object>(val =>
    {
        Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
    }
    );

    cycleCount.NotificationSettings = new NotificationSettings(AdsTransMode.OnChange, 500, 5000); // optional: Change NotificationSettings on Symbol

    // Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
    // and subscribe to them.
    IDisposable subscription = cycleCount.WhenValueChanged().Take(20).Subscribe(valueObserver);

    Console.ReadKey(); // Wait for Key press
    subscription.Dispose(); // Dispose the Subscription
}

Observer for dynamic Symbol Notifications

// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
    // Connect to target
    client.Connect(new AmsAddress(AmsNetId.Local, 851));

    // Create Symbol information
    var symbolLoader = (IDynamicSymbolLoader)SymbolLoaderFactory.Create(client, SymbolLoaderSettings.DefaultDynamic);
    dynamic symbols = symbolLoader.SymbolsDynamic;
    dynamic cycleCount = symbols.TwinCAT_SystemInfoVarList._TaskInfo[1].CycleCount;

    // Reactive Notification Handler
    var valueObserver = Observer.Create<object>(val =>
    {
        // Value objects can be dynamically (on the fly) created objects here (e.g. structs)
        Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
    }
    );

    cycleCount.NotificationSettings = new NotificationSettings(AdsTransMode.OnChange, 500, 5000); // optional: Change NotificationSettings on Symbol

    // Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
    // and subscribe to them.
    // We have to give the 'hint' about IValueSymbol here, that the CLR finds the Extension Method 'WhenValueChanged' during runtime.
    IDisposable subscription = ((IValueSymbol)cycleCount).WhenValueChanged().Take(20).Subscribe(valueObserver);

    Console.ReadKey(); // Wait for Key press
    subscription.Dispose(); // Dispose the Subscription
}

Polling observer

// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
    // Connect to target
    client.Connect(new AmsAddress(AmsNetId.Local, 851));

    // Create Symbol information
    var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
    IValueSymbol cycleCount = (IValueSymbol)symbolLoader.Symbols["TwinCAT_SystemInfoVarList._TaskInfo.CycleCount"];

    // Reactive Notification Handler
    var valueObserver = Observer.Create<object>(val =>
    {
        Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
    }
    );

    // Take 20 Values in an Interval of 500ms
    IDisposable subscription = cycleCount.PollValues(TimeSpan.FromMilliseconds(500)).Take(20).Subscribe(valueObserver);

    Console.ReadKey(); // Wait for Key press
    subscription.Dispose(); // Dispose the Subscription
}

Writing values with observable subject

using (AdsClient client = new AdsClient())
{
    // Connect to target
    client.Connect(new AmsAddress(AmsNetId.Local, 851));

    // Create Symbol information (Symbol 'i : INT' in PLC Global Variables list.
    var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
    IValueSymbol gvlIntSymbol = (IValueSymbol)symbolLoader.Symbols["GVL.i"];

    // Produces object (short) Values 0,1,2,3 ... in seconds period
    IObservable<object> timerObservable = Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(i => (object)(short)i);

    // Take 10 Values (0..9) and write them to GVL.i
    IDisposable dispose = gvlIntSymbol.WriteValues(timerObservable.Take(10));

    Console.ReadKey(); // Wait for Key press
    dispose.Dispose(); // Dispose the Subscription
}