Reactive Extensions for .NET, or Rx

 




Use Rx for: to combine and manage multiple streams of events in LINQ fashion 

Rx’s fundamental abstraction, IObservable<T>, represents a sequence of items, and its operators are defined as extension methods for this interface. 

 Rx implements standard LINQ operators, you can write queries against a live source (For example, you could write a query that provides data only for stocks that are changing price more frequently than some minimum rate.)


1. defines a standard way for sources to report errors

2. it is able to deliver items in a well-defined order

3. provides a clear way to signal when there are no more items

4. Rx makes an event source a first-class entity, because it’s just an object. This means you can pass an event source as an argument, store it in a field, or offer it in a property


Subscribers are required to supply an object that implements IObserver<T>


The fundamental abstraction in Rx, IObservable<T>, is implemented by event sources. Instead of using the event keyword, it models events as a sequence of items. An IObservable<T> provides items to subscribers as and when it’s ready to.

We can subscribe to a source by passing an implementation of IObserver<T> to the Subscribe method. The source will invoke OnNext when it wants to report events, and it can call OnCompleted to indicate that there will be no further activity. If the source wants to report an error, it can call OnError. Both OnCompleted and OnError indicate the end of the stream—an observable should not call any further methods on the observer after that.

When you call Subscribe on an observable, it returns an object that implements IDisposable, which provides a way to unsubscribe. If you call Dispose, the observable will not deliver any more notifications to your observer.

Rx makes a distinction between hot and cold observable sources. A hot observable produces each value as and when something of interest happens, and if no subscribers are attached at that moment, that value will be lost. A hot observable typically represents something live, such as mouse input, keypresses, or data reported by a sensor, which is why the values it produces are independent of how many subscribers, if any, are attached. Hot sources typically have broadcast-like behavior—they send each item to all of their subscribers. These can be the more complex kind of source to implement, so I’ll discuss cold sources first.

Whereas hot sources report items as and when they want to, cold observables work differently. They start pushing values when an observer subscribes, and they provide values to each subscriber separately, rather than broadcasting. This means that a subscriber won’t miss anything by being too late, because the source starts providing items when you subscribe.

Subscribers are not obliged to ensure that the object returned by Subscribe remains reachable. You can simply ignore it if you don’t need the ability to unsubscribe early, and it won’t matter if the garbage collector frees the object, because none of the IDisposable implementations that Rx supplies to represent subscriptions have finalizers. (And although you don’t normally implement these yourself—I’m doing so here only to illustrate how it works—if you did decide to write your own, take the same approach: do not implement a finalizer on a class that represents a subscription.)

Publishing and Subscribing with Delegates

If you use the System.Reactive NuGet package, you do not need to implement either IObservable<T> or IObserver<T> directly. The library provides several implementations. Some of these are adapters, bridging between other representations of asynchronously generated sequences. Some wrap existing observable streams. But the helpers aren’t just for adapting existing things. They can also help if you want to write code that originates new items or that acts as the final destination for items. The simplest of these helpers provide delegate-based APIs for creating and consuming observable streams.

Rx provides a Publish extension method for any IObservable<T>, defined by the Observable class in the System.Reactive.Linq namespace. This method is designed to wrap a source whose subscription method (i.e., the delegate you pass to Observable.Create) supports being run only once, but to which you want to attach multiple subscribers—it handles the multicast logic for you. Strictly speaking, a source that supports only a single subscription is degenerate, but as long as you hide it behind Publish, it doesn’t matter, and you can use this as a way to implement a hot source.