Monday, December 12, 2011

Real World Example : Simple Publish/Subscribe Pattern with Reactive Extensions

Shown below is a requirement.

Let me explain a little about what I want to achieve. I have an entity called “Frequent” which implements INotifyPropertyChanged interface. So I can subscribe to the change notification and then upon firing the event, I simply increment a counter. Now, my requirement is that the PropertyChanged can fire way too quickly as simulated in the for-loop. So in UI applications, it doesn’t make sense for me to fire property changed events that frequently as in a real world intensive WPF application, this can become your biggest bottleneck (among several others things).

So my requirement is that, within 1 second, I would like to receive only 1 change notification per property, if it has changed indeed. The simple implementation for this would be to do something like pushing the properties that have changed into a queue and process the queue once a second and then raise change notification event. This can be done very easily but then it can be a little tedious job for such a requirement to restrict an event from firing more than X times a second.

So I was looking at the Reactive Extensions (Rx Framework) and it occurred to me that it should support Observer pattern out of the box. So I started playing with it but hit a road block immediately. There are numerous resources that shows how to generate observables from Timers, Time Intervals, Enumerables, etc but what I want to is to take advantage of extension methods supported by Rx Framework on IObserver such as Throttle(), Buffer(), Window(), etc on a simple Pub-Sub system. May be I did not look hard enough but I could not find a simple example. So I thought it would be helpful for rest of the people like me if I made a blog post.

In my implementation, the Frequent object is by itself a Publisher and a Subscriber. In Rx terms – IObservable and IObserver. Instead of implementing these interfaces, I want to have some observable of strings where I will publish a property that has changed and the subscriber on the observable would receive it. So for that purpose of simple message passing between Observers and Observables, you can use a Subject<T>. There are other variants like ReplaySubject<T>, BehaviorSubject<T>, AsyncSubject<T>, etc. but that is out of scope of this post. I publish on Subject<T> and then the subscribers who subscribed earlier to it would be notified of these messages.

Shown below is a the Frequent class implementation.

Take a minute or two to go through the simple class, it is self explanatory. All I am doing is that when a property changes, instead of firing it immediately, I simply notify my throttler about the property that has changed. The throttler’s responsibility is to determine when to notify the subscribers about this change. Shown below is the implementation.

You can read my comments that I wrote out of frustrations. All I do in this simple façade is to wrap an Observable –> Subject<string> and buffer the messages received for 1/2 second and then get unique messages from the buffer and publish it to the subscribers. This is all done in the constructor. The other methods are simply my take on making things simple for the consumer of this class.

Rx is complex and powerful but in my opinion it has a very steep learning curve. But it surely did save a good few days of implementation for my team. Again, I am very new to Rx, so if you have some better ways to do it, then please let me know.

No comments: