Use Rx for: to combine and manage multiple streams of events in LINQ fashion
Rx’s fundamental abstraction, IObservable<T>
, represents a sequence of items, and its operators are defined as extension methods for this interface.
Rx implements standard LINQ operators, you can write queries against a live source (For example, you could write a query that provides data only for stocks that are changing price more frequently than some minimum rate.)
1. defines a standard way for sources to report errors
2. it is able to deliver items in a well-defined order
3. provides a clear way to signal when there are no more items
4. Rx makes an event source a first-class entity, because it’s just an object. This means you can pass an event source as an argument, store it in a field, or offer it in a property
IObserver<T>
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IDisposable Subscribe(IObserver<string> observer)
{
StreamReader sr = null;
string line = null;
bool failed = false;
try
{
while (true)
{
try
{
if (sr == null)
{
sr = new StreamReader(_path);
}
if (sr.EndOfStream)
{
break;
}
line = sr.ReadLine();
}
catch (IOException x)
{
observer.OnError(x);
failed = true;
break;
}
observer.OnNext(line);
}
}
finally
{
if (sr != null)
{
sr.Dispose();
}
}
if (!failed)
{
observer.OnCompleted();
}
return NullDisposable.Instance;
}
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// IObservable<T> for monitoring keypresses HOT Observable
public class KeyWatcher : IObservable<char>
{
private readonly List<Subscription> _subscriptions = new List<Subscription>();
public IDisposable Subscribe(IObserver<char> observer)
{
var sub = new Subscription(this, observer);
_subscriptions.Add(sub);
return sub;
}
public void Run()
{
while (true)
{
// Passing true here stops the console from showing the character
char c = Console.ReadKey(true).KeyChar;
// Iterate over snapshot to handle the case where the observer
// unsubscribes from inside its OnNext method.
foreach (Subscription sub in _subscriptions.ToArray())
{
sub.Observer.OnNext(c);
}
}
}
private void RemoveSubscription(Subscription sub)
{
_subscriptions.Remove(sub);
}
private class Subscription : IDisposable
{
private KeyWatcher _parent;
public Subscription(KeyWatcher parent, IObserver<char> observer)
{
_parent = parent;
Observer = observer;
}
public IObserver<char> Observer { get; }
public void Dispose()
{
if (_parent != null)
{
_parent.RemoveSubscription(this);
_parent = null;
}
}
}
}
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MySubscriber<T> : IObserver<T>
{
public void OnNext(T value) => Console.WriteLine("Received: " + value);
public void OnCompleted() => Console.WriteLine("Complete");
public void OnError(Exception ex) => Console.WriteLine("Error: " + ex);
}
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SimpleColdSource : IObservable<string>
{
public IDisposable Subscribe(IObserver<string> observer)
{
observer.OnNext("Hello,");
observer.OnNext("world!");
observer.OnCompleted();
return NullDisposable.Instance;
}
private class NullDisposable : IDisposable
{
public readonly static NullDisposable Instance = new NullDisposable();
public void Dispose() { }
}
}
//Attaching an observer to an observable
var source = new SimpleColdSource();
var sub = new MySubscriber<string>();
source.Subscribe(sub);
//OUTPUT
Received: Hello,
Received: world!
Complete
//cold observable representing a file’s contents
public class FilePusher : IObservable<string>
{
private readonly string _path;
public FilePusher(string path)
{
_path = path;
}
public IDisposable Subscribe(IObserver<string> observer)
{
using (var sr = new StreamReader(_path))
{
while (!sr.EndOfStream)
{
observer.OnNext(sr.ReadLine());
}
}
observer.OnCompleted();
return NullDisposable.Instance;
}
private class NullDisposable : IDisposable
{
public static NullDisposable Instance = new NullDisposable();
public void Dispose() { }
}
}
// this delivers all the items to the observer on the caller’s thread inside the call to Subscribe
The fundamental abstraction in Rx, IObservable<T>, is implemented by event sources. Instead of using the event keyword, it models events as a sequence of items. An IObservable<T> provides items to subscribers as and when it’s ready to.object
returned by Subscribe
remains reachable. You can simply ignore it if you don’t need the ability to unsubscribe early, and it won’t matter if the garbage collector frees the object, because none of the IDisposable
implementations that Rx supplies to represent subscriptions have finalizers. (And although you don’t normally implement these yourself—I’m doing so here only to illustrate how it works—if you did decide to write your own, take the same approach: do not implement a finalizer on a class that represents a subscription.)Publishing and Subscribing with Delegates
If you use the System.Reactive
NuGet package, you do not need to implement either IObservable<T>
or IObserver<T>
directly. The library provides several implementations. Some of these are adapters, bridging between other representations of asynchronously generated sequences. Some wrap existing observable streams. But the helpers aren’t just for adapting existing things. They can also help if you want to write code that originates new items or that acts as the final destination for items. The simplest of these helpers provide delegate-based APIs for creating and consuming observable streams.
IObservable<char> singularHotSource = Observable.Create( | |
(Func<IObserver<char>, IDisposable>) (obs => | |
{ | |
while (true) | |
{ | |
obs.OnNext(Console.ReadKey(true).KeyChar); | |
} | |
})); | |
IConnectableObservable<char> keySource = singularHotSource.Publish(); | |
keySource.Subscribe(new MySubscriber<char>()); | |
keySource.Subscribe(new MySubscriber<char>()); | |
//Subscribing without implementing IObserver<T> | |
var source = new KeyWatcher(); | |
source.Subscribe(value => Console.WriteLine("Received: " + value)); | |
source.Run() |
//create a source that produces random numbers until the sum total of all the numbers produced exceeds 10,000. | |
IObservable<int> src = Observable.Generate( | |
(Current: 0, Total: 0, Random: new Random()), | |
state => state.Total <= 10000, | |
state => | |
{ | |
int value = state.Random.Next(1000); | |
return (value, state.Total + value, state.Random); | |
}, | |
state => state.Current); | |
//produces items in a similar way but passes an extra function as the final argument that tells Rx to delay the delivery of each item by a random amount. | |
IObservable<int> src = Observable.Generate( | |
(Current: 0, Total: 0, Random: new Random()), | |
state => state.Total < 10000, | |
state => | |
{ | |
int value = state.Random.Next(1000); | |
return (value, state.Total + value, state.Random); | |
}, | |
state => state.Current, | |
state => TimeSpan.FromMilliseconds(state.Random.Next(1000))); | |
public static IObservable<string> GetFilePusher(string path) | |
{ | |
return Observable.Create<string>(observer => | |
{ | |
using (var sr = new StreamReader(path)) | |
{ | |
while (!sr.EndOfStream) | |
{ | |
observer.OnNext(sr.ReadLine()); | |
} | |
} | |
observer.OnCompleted(); | |
return () => { }; | |
}); | |
} | |
//async version | |
public static IObservable<string> GetFilePusher(string path) | |
{ | |
return Observable.Create<string>(async (observer, cancel) => | |
{ | |
using (var sr = new StreamReader(path)) | |
{ | |
while (!sr.EndOfStream && !cancel.IsCancellationRequested) | |
{ | |
observer.OnNext(await sr.ReadLineAsync()); | |
} | |
} | |
observer.OnCompleted(); | |
return () => { }; | |
}); | |
} |
string path = Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments); | |
var w = new FileSystemWatcher(path); | |
IObservable<EventPattern<FileSystemEventArgs>> changes = | |
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( | |
h => w.Changed += h, h => w.Changed -= h); | |
w.IncludeSubdirectories = true; | |
w.EnableRaisingEvents = true; | |
IObservable<IGroupedObservable<string, string>> folders = | |
from change in changes | |
group Path.GetFileName(change.EventArgs.FullPath) | |
by Path.GetDirectoryName(change.EventArgs.FullPath); | |
folders.Subscribe(f => | |
{ | |
Console.WriteLine("New folder ({0})", f.Key); | |
f.Subscribe(file => | |
Console.WriteLine("File changed in folder {0}, {1}", f.Key, file)); | |
}); |
// It begins by producing an observable source representing MouseMove events from a UI element. | |
//I’ll talk about this technique in more detail in “Adaptation”, but for now it’s enough to know that Rx can wrap any .NET event | |
//as an observable source. Each event produces an item that provides two properties containing the values normally | |
//passed to event handlers as arguments (i.e., the sender and the event arguments). | |
//watching for changes in the filesystem | |
IObservable<EventPattern<MouseEventArgs>> mouseMoves = | |
Observable.FromEventPattern<MouseEventArgs>( | |
background, nameof(background.MouseMove)); | |
IObservable<Point> dragPositions = | |
from move in mouseMoves | |
where Mouse.Captured == background | |
select move.EventArgs.GetPosition(background); | |
dragPositions.Subscribe(point => { line.Points.Add(point); }); |
Rx provides a Publish
extension method for any IObservable<T>
, defined by the Observable
class in the System.Reactive.Linq
namespace. This method is designed to wrap a source whose subscription method (i.e., the delegate you pass to Observable.Create
) supports being run only once, but to which you want to attach multiple subscribers—it handles the multicast logic for you. Strictly speaking, a source that supports only a single subscription is degenerate, but as long as you hide it behind Publish
, it doesn’t matter, and you can use this as a way to implement a hot source.