Use Rx for: to combine and manage multiple streams of events in LINQ fashion
Rx’s fundamental abstraction, IObservable<T>
, represents a sequence of items, and its operators are defined as extension methods for this interface.
Rx implements standard LINQ operators, you can write queries against a live source (For example, you could write a query that provides data only for stocks that are changing price more frequently than some minimum rate.)
1. defines a standard way for sources to report errors
2. it is able to deliver items in a well-defined order
3. provides a clear way to signal when there are no more items
4. Rx makes an event source a first-class entity, because it’s just an object. This means you can pass an event source as an argument, store it in a field, or offer it in a property
IObserver<T>
The fundamental abstraction in Rx, IObservable<T>, is implemented by event sources. Instead of using the event keyword, it models events as a sequence of items. An IObservable<T> provides items to subscribers as and when it’s ready to.object
returned by Subscribe
remains reachable. You can simply ignore it if you don’t need the ability to unsubscribe early, and it won’t matter if the garbage collector frees the object, because none of the IDisposable
implementations that Rx supplies to represent subscriptions have finalizers. (And although you don’t normally implement these yourself—I’m doing so here only to illustrate how it works—if you did decide to write your own, take the same approach: do not implement a finalizer on a class that represents a subscription.)Publishing and Subscribing with Delegates
If you use the System.Reactive
NuGet package, you do not need to implement either IObservable<T>
or IObserver<T>
directly. The library provides several implementations. Some of these are adapters, bridging between other representations of asynchronously generated sequences. Some wrap existing observable streams. But the helpers aren’t just for adapting existing things. They can also help if you want to write code that originates new items or that acts as the final destination for items. The simplest of these helpers provide delegate-based APIs for creating and consuming observable streams.
Rx provides a Publish
extension method for any IObservable<T>
, defined by the Observable
class in the System.Reactive.Linq
namespace. This method is designed to wrap a source whose subscription method (i.e., the delegate you pass to Observable.Create
) supports being run only once, but to which you want to attach multiple subscribers—it handles the multicast logic for you. Strictly speaking, a source that supports only a single subscription is degenerate, but as long as you hide it behind Publish
, it doesn’t matter, and you can use this as a way to implement a hot source.