Reactive Extensions and ADS
The Reactive Extensions (Rx) is a .NET library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
In ADS terms, not only the reading and writing data or symbol values can be put into reactive data streams, also ADS Notifications are a perfect fit for reactive code. This eases not only data binding to reactive frameworks (e.g. reactive UI) but also supports enhanced data manipulation via synchronous and asynchronous observers. Multithreaded and parallelized code paths that support multiple CPU cores can be written very easily without the burden of deadlock and synchronization issues.
More about .NET reactive extensions can be read here: Reactive extensions project site.
The TwinCAT ADS Reactive extensions are available via a supplement Nuget Package: Beckhoff.TwinCAT.Ads.Reactive Nuget Package
Example
Observe for Notifications
// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
// Connect to target
client.Connect(new AmsAddress(AmsNetId.Local, 851));
// Reactive Notification Handler
var valueObserver = Observer.Create<ushort>(val =>
{
Console.WriteLine(string.Format("Value: {0}", val.ToString()));
}
);
// Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
// and subscribe to them.
IDisposable subscription = client.WhenNotification<ushort>("TwinCAT_SystemInfoVarList._TaskInfo.CycleCount", NotificationSettings.Default).Take(20).Subscribe(valueObserver);
Console.ReadKey(); // Wait for Key press
subscription.Dispose(); // Dispose the Subscription
}
Observe for Symbol Notifications
// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
// Connect to target
client.Connect(new AmsAddress(AmsNetId.Local, 851));
// Create Symbol information
var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
IValueSymbol cycleCount = (IValueSymbol)symbolLoader.Symbols["TwinCAT_SystemInfoVarList._TaskInfo[1].CycleCount"];
// Reactive Notification Handler
var valueObserver = Observer.Create<object>(val =>
{
Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
}
);
cycleCount.NotificationSettings = new NotificationSettings(AdsTransMode.OnChange, 500, 5000); // optional: Change NotificationSettings on Symbol
// Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
// and subscribe to them.
IDisposable subscription = cycleCount.WhenValueChanged().Take(20).Subscribe(valueObserver);
Console.ReadKey(); // Wait for Key press
subscription.Dispose(); // Dispose the Subscription
}
Observer for dynamic Symbol Notifications
// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
// Connect to target
client.Connect(new AmsAddress(AmsNetId.Local, 851));
// Create Symbol information
var symbolLoader = (IDynamicSymbolLoader)SymbolLoaderFactory.Create(client, SymbolLoaderSettings.DefaultDynamic);
dynamic symbols = symbolLoader.SymbolsDynamic;
dynamic cycleCount = symbols.TwinCAT_SystemInfoVarList._TaskInfo[1].CycleCount;
// Reactive Notification Handler
var valueObserver = Observer.Create<object>(val =>
{
// Value objects can be dynamically (on the fly) created objects here (e.g. structs)
Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
}
);
cycleCount.NotificationSettings = new NotificationSettings(AdsTransMode.OnChange, 500, 5000); // optional: Change NotificationSettings on Symbol
// Turning ADS Notifications into sequences of Value Objects (Taking 20 Values)
// and subscribe to them.
// We have to give the 'hint' about IValueSymbol here, that the CLR finds the Extension Method 'WhenValueChanged' during runtime.
IDisposable subscription = ((IValueSymbol)cycleCount).WhenValueChanged().Take(20).Subscribe(valueObserver);
Console.ReadKey(); // Wait for Key press
subscription.Dispose(); // Dispose the Subscription
}
Polling observer
// To Test the Observer run a project on the local PLC System (Port 851)
using (AdsClient client = new AdsClient())
{
// Connect to target
client.Connect(new AmsAddress(AmsNetId.Local, 851));
// Create Symbol information
var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
IValueSymbol cycleCount = (IValueSymbol)symbolLoader.Symbols["TwinCAT_SystemInfoVarList._TaskInfo.CycleCount"];
// Reactive Notification Handler
var valueObserver = Observer.Create<object>(val =>
{
Console.WriteLine(string.Format("Instance: {0}, Value: {1}", cycleCount.InstancePath, val.ToString()));
}
);
// Take 20 Values in an Interval of 500ms
IDisposable subscription = cycleCount.PollValues(TimeSpan.FromMilliseconds(500)).Take(20).Subscribe(valueObserver);
Console.ReadKey(); // Wait for Key press
subscription.Dispose(); // Dispose the Subscription
}
Writing values with observable subject
using (AdsClient client = new AdsClient())
{
// Connect to target
client.Connect(new AmsAddress(AmsNetId.Local, 851));
// Create Symbol information (Symbol 'i : INT' in PLC Global Variables list.
var symbolLoader = SymbolLoaderFactory.Create(client, SymbolLoaderSettings.Default);
IValueSymbol gvlIntSymbol = (IValueSymbol)symbolLoader.Symbols["GVL.i"];
// Produces object (short) Values 0,1,2,3 ... in seconds period
IObservable<object> timerObservable = Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(i => (object)(short)i);
// Take 10 Values (0..9) and write them to GVL.i
IDisposable dispose = gvlIntSymbol.WriteValues(timerObservable.Take(10));
Console.ReadKey(); // Wait for Key press
dispose.Dispose(); // Dispose the Subscription
}