Reactive Extensions for .NET, or Rx

 




Use Rx for: to combine and manage multiple streams of events in LINQ fashion 

Rx’s fundamental abstraction, IObservable<T>, represents a sequence of items, and its operators are defined as extension methods for this interface. 

 Rx implements standard LINQ operators, you can write queries against a live source (For example, you could write a query that provides data only for stocks that are changing price more frequently than some minimum rate.)


1. defines a standard way for sources to report errors

2. it is able to deliver items in a well-defined order

3. provides a clear way to signal when there are no more items

4. Rx makes an event source a first-class entity, because it’s just an object. This means you can pass an event source as an argument, store it in a field, or offer it in a property


Subscribers are required to supply an object that implements IObserver<T>

public IDisposable Subscribe(IObserver<string> observer)
{
StreamReader sr = null;
string line = null;
bool failed = false;
try
{
while (true)
{
try
{
if (sr == null)
{
sr = new StreamReader(_path);
}
if (sr.EndOfStream)
{
break;
}
line = sr.ReadLine();
}
catch (IOException x)
{
observer.OnError(x);
failed = true;
break;
}
observer.OnNext(line);
}
}
finally
{
if (sr != null)
{
sr.Dispose();
}
}
if (!failed)
{
observer.OnCompleted();
}
return NullDisposable.Instance;
}
// IObservable<T> for monitoring keypresses HOT Observable
public class KeyWatcher : IObservable<char>
{
private readonly List<Subscription> _subscriptions = new List<Subscription>();
public IDisposable Subscribe(IObserver<char> observer)
{
var sub = new Subscription(this, observer);
_subscriptions.Add(sub);
return sub;
}
public void Run()
{
while (true)
{
// Passing true here stops the console from showing the character
char c = Console.ReadKey(true).KeyChar;
// Iterate over snapshot to handle the case where the observer
// unsubscribes from inside its OnNext method.
foreach (Subscription sub in _subscriptions.ToArray())
{
sub.Observer.OnNext(c);
}
}
}
private void RemoveSubscription(Subscription sub)
{
_subscriptions.Remove(sub);
}
private class Subscription : IDisposable
{
private KeyWatcher _parent;
public Subscription(KeyWatcher parent, IObserver<char> observer)
{
_parent = parent;
Observer = observer;
}
public IObserver<char> Observer { get; }
public void Dispose()
{
if (_parent != null)
{
_parent.RemoveSubscription(this);
_parent = null;
}
}
}
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
class MySubscriber<T> : IObserver<T>
{
public void OnNext(T value) => Console.WriteLine("Received: " + value);
public void OnCompleted() => Console.WriteLine("Complete");
public void OnError(Exception ex) => Console.WriteLine("Error: " + ex);
}
public class SimpleColdSource : IObservable<string>
{
public IDisposable Subscribe(IObserver<string> observer)
{
observer.OnNext("Hello,");
observer.OnNext("world!");
observer.OnCompleted();
return NullDisposable.Instance;
}
private class NullDisposable : IDisposable
{
public readonly static NullDisposable Instance = new NullDisposable();
public void Dispose() { }
}
}
//Attaching an observer to an observable
var source = new SimpleColdSource();
var sub = new MySubscriber<string>();
source.Subscribe(sub);
//OUTPUT
Received: Hello,
Received: world!
Complete
//cold observable representing a file’s contents
public class FilePusher : IObservable<string>
{
private readonly string _path;
public FilePusher(string path)
{
_path = path;
}
public IDisposable Subscribe(IObserver<string> observer)
{
using (var sr = new StreamReader(_path))
{
while (!sr.EndOfStream)
{
observer.OnNext(sr.ReadLine());
}
}
observer.OnCompleted();
return NullDisposable.Instance;
}
private class NullDisposable : IDisposable
{
public static NullDisposable Instance = new NullDisposable();
public void Dispose() { }
}
}
// this delivers all the items to the observer on the caller’s thread inside the call to Subscribe

The fundamental abstraction in Rx, IObservable<T>, is implemented by event sources. Instead of using the event keyword, it models events as a sequence of items. An IObservable<T> provides items to subscribers as and when it’s ready to.

We can subscribe to a source by passing an implementation of IObserver<T> to the Subscribe method. The source will invoke OnNext when it wants to report events, and it can call OnCompleted to indicate that there will be no further activity. If the source wants to report an error, it can call OnError. Both OnCompleted and OnError indicate the end of the stream—an observable should not call any further methods on the observer after that.

When you call Subscribe on an observable, it returns an object that implements IDisposable, which provides a way to unsubscribe. If you call Dispose, the observable will not deliver any more notifications to your observer.

Rx makes a distinction between hot and cold observable sources. A hot observable produces each value as and when something of interest happens, and if no subscribers are attached at that moment, that value will be lost. A hot observable typically represents something live, such as mouse input, keypresses, or data reported by a sensor, which is why the values it produces are independent of how many subscribers, if any, are attached. Hot sources typically have broadcast-like behavior—they send each item to all of their subscribers. These can be the more complex kind of source to implement, so I’ll discuss cold sources first.

Whereas hot sources report items as and when they want to, cold observables work differently. They start pushing values when an observer subscribes, and they provide values to each subscriber separately, rather than broadcasting. This means that a subscriber won’t miss anything by being too late, because the source starts providing items when you subscribe.

Subscribers are not obliged to ensure that the object returned by Subscribe remains reachable. You can simply ignore it if you don’t need the ability to unsubscribe early, and it won’t matter if the garbage collector frees the object, because none of the IDisposable implementations that Rx supplies to represent subscriptions have finalizers. (And although you don’t normally implement these yourself—I’m doing so here only to illustrate how it works—if you did decide to write your own, take the same approach: do not implement a finalizer on a class that represents a subscription.)

Publishing and Subscribing with Delegates

If you use the System.Reactive NuGet package, you do not need to implement either IObservable<T> or IObserver<T> directly. The library provides several implementations. Some of these are adapters, bridging between other representations of asynchronously generated sequences. Some wrap existing observable streams. But the helpers aren’t just for adapting existing things. They can also help if you want to write code that originates new items or that acts as the final destination for items. The simplest of these helpers provide delegate-based APIs for creating and consuming observable streams.

IObservable<char> singularHotSource = Observable.Create(
(Func<IObserver<char>, IDisposable>) (obs =>
{
while (true)
{
obs.OnNext(Console.ReadKey(true).KeyChar);
}
}));
IConnectableObservable<char> keySource = singularHotSource.Publish();
keySource.Subscribe(new MySubscriber<char>());
keySource.Subscribe(new MySubscriber<char>());
//Subscribing without implementing IObserver<T>
var source = new KeyWatcher();
source.Subscribe(value => Console.WriteLine("Received: " + value));
source.Run()
//create a source that produces random numbers until the sum total of all the numbers produced exceeds 10,000.
IObservable<int> src = Observable.Generate(
(Current: 0, Total: 0, Random: new Random()),
state => state.Total <= 10000,
state =>
{
int value = state.Random.Next(1000);
return (value, state.Total + value, state.Random);
},
state => state.Current);
//produces items in a similar way but passes an extra function as the final argument that tells Rx to delay the delivery of each item by a random amount.
IObservable<int> src = Observable.Generate(
(Current: 0, Total: 0, Random: new Random()),
state => state.Total < 10000,
state =>
{
int value = state.Random.Next(1000);
return (value, state.Total + value, state.Random);
},
state => state.Current,
state => TimeSpan.FromMilliseconds(state.Random.Next(1000)));
public static IObservable<string> GetFilePusher(string path)
{
return Observable.Create<string>(observer =>
{
using (var sr = new StreamReader(path))
{
while (!sr.EndOfStream)
{
observer.OnNext(sr.ReadLine());
}
}
observer.OnCompleted();
return () => { };
});
}
//async version
public static IObservable<string> GetFilePusher(string path)
{
return Observable.Create<string>(async (observer, cancel) =>
{
using (var sr = new StreamReader(path))
{
while (!sr.EndOfStream && !cancel.IsCancellationRequested)
{
observer.OnNext(await sr.ReadLineAsync());
}
}
observer.OnCompleted();
return () => { };
});
}
string path = Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments);
var w = new FileSystemWatcher(path);
IObservable<EventPattern<FileSystemEventArgs>> changes =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => w.Changed += h, h => w.Changed -= h);
w.IncludeSubdirectories = true;
w.EnableRaisingEvents = true;
IObservable<IGroupedObservable<string, string>> folders =
from change in changes
group Path.GetFileName(change.EventArgs.FullPath)
by Path.GetDirectoryName(change.EventArgs.FullPath);
folders.Subscribe(f =>
{
Console.WriteLine("New folder ({0})", f.Key);
f.Subscribe(file =>
Console.WriteLine("File changed in folder {0}, {1}", f.Key, file));
});
// It begins by producing an observable source representing MouseMove events from a UI element.
//I’ll talk about this technique in more detail in “Adaptation”, but for now it’s enough to know that Rx can wrap any .NET event
//as an observable source. Each event produces an item that provides two properties containing the values normally
//passed to event handlers as arguments (i.e., the sender and the event arguments).
//watching for changes in the filesystem
IObservable<EventPattern<MouseEventArgs>> mouseMoves =
Observable.FromEventPattern<MouseEventArgs>(
background, nameof(background.MouseMove));
IObservable<Point> dragPositions =
from move in mouseMoves
where Mouse.Captured == background
select move.EventArgs.GetPosition(background);
dragPositions.Subscribe(point => { line.Points.Add(point); });
view raw linq.rx.cs hosted with ❤ by GitHub

Rx provides a Publish extension method for any IObservable<T>, defined by the Observable class in the System.Reactive.Linq namespace. This method is designed to wrap a source whose subscription method (i.e., the delegate you pass to Observable.Create) supports being run only once, but to which you want to attach multiple subscribers—it handles the multicast logic for you. Strictly speaking, a source that supports only a single subscription is degenerate, but as long as you hide it behind Publish, it doesn’t matter, and you can use this as a way to implement a hot source.