Increasingly, users have come to expect that applications they use continue to respond while the application processes information in the background. To achieve the best responsiveness, you need to build applications to be as asynchronous as possible. In the past, creating asynchronous operations consisted of passing callback delegates or lambda expressions to indicate what action to take when an action completes. These can lead to an unmanageable mess of spaghetti code.

To remedy this situation, the Microsoft Cloud Programmability Group created a set of libraries known as the Reactive Extensions (Rx). Rx provides a LINQ-like syntax to declaratively compose increasingly complex asynchronous operations over “observable” collections. These collections can be lists of known objects, values received through events, sensor readings, or responses from Web service requests. In this article, I will demonstrate some of the capabilities of the Rx library for the Windows Phone 7 by building a dice-playing game featuring making asynchronous service requests and detecting “Shake” gestures through the Accelerometer service.

Rx provides a LINQ-like syntax to declaratively compose increasingly complex asynchronous operations over “observable” collections.

IEnumerable vs. IObservable

When working with groups of data, you need to perform the same operations repeatedly. LINQ allows you to simplify this by offering familiar methods to accomplish the most frequent operations. Whether you filter the set through Where, project new values with Select, combine sets with Join and Group, or aggregate values with Count, Sum, etc., LINQ provides a declarative syntax to abstract the implementation details. Because nearly all sets of values in .NET derive from the IEnumerable interface, the LINQ methods extend IEnumerable to do their magic.

Although the IEnumerable and IEnumerator interfaces are simple, they can be problematic in highly responsive operations. Every time any portion of the LINQ query tries to enumerate, the step blocks the rest of the query pipeline until either the previous operation completes or the iteration reaches the end of the set. This blocking call becomes particularly problematic when the values appear gradually over time or at some potential point in the future. Some examples of these potential data sets include UI events, error logs, sensor readings, and stock tickers.

In trying to turn the synchronous IEnumerable implementations into non-blocking asynchronous operations, the Microsoft Cloud Programmability Group looked at a number of patterns. Frequently developers use delegates and event call-backs following the observer pattern where the operation is split into two parts: one that does the initial work and the second that reacts once the first part completes.

In nature, the observer pattern goes into action when a tree falls in a forest. The tree is observable. When the tree falls, you are the observer who sees the tree fall. If you are not in the forest, the tree would still fall even though you weren't present to see it.

In the programming world, events work in the same fashion. The event is the observable side and the event handler acts as the observer. When the program's execution raises the event, the event calls back to any event handlers that may be listening.

When considering the IEnumerable and IObservable interfaces (Figure 1), the Cloud Programmability team discovered that they were equivalent roughly the same way that ????=?? is equivalent to ??=????. In the enumerable pattern, you call MoveNext to see if there are any more values and you call Current to get the resulting value. In the observable pattern, the observable pushes the current value to the observable as a parameter of the observer's OnNext implementation.

Figure 1: Comparing the IEnumerable and IObservable interfaces.
Figure 1: Comparing the IEnumerable and IObservable interfaces.

Because of the similarity between the interfaces, the Cloud Programmability team embarked on creating observable versions of the LINQ query operators. Rather than delving on the technical details further here, let's make this a bit more fun by working through some sample uses of Rx by creating a dice-rolling game.

Setting up the Game

For this game, let's let the users roll any number of dice and “freeze” the dice that they don't want to re-roll. To add to the geek factor, let's allow the user to decide the number of sides, because everyone needs 20 sided dice for games, right?

To create the game on the Windows Phone platform, start by creating a new “Windows Phone Databound Application”.

You'll re-use the MainViewModel in the ViewModels folder to track the dice collection as an ObservableCollection of Die instances. The MainViewModel also allows you to track the number of sides that will be on the dice collection.

Rename the ItemViewModel to Die to represent an instance of each die in the set of dice. The Die class contains three properties: DotCount to track the number of dots appearing on the die, Index to keep track of the current Die in the list, and a Frozen Boolean to indicate whether the user wants to re-roll the die or not. Listing 1 shows the implementation of the classes.

Listing 1: Game Classes

public class MainViewModel : INotifyPropertyChanged
{
    public MainViewModel()
    {
        this.Items = new ObservableCollection<Die<();
        SideCount = 6;
        DiceCount = 5;
    }
 
    public ObservableCollection<Die< Items
        { get; private set; }

    public int DiceCount
    {
        get {return Items.Count;}
        set
        {
            while (Items.Count != value)
            {
                if (Items.Count < value) 
                    Items.RemoveAt(0);
                Else
                {
                    Die newDie = new Die();
                    if (Items.Any())
                        newDie.Index = Items.Max(dice =< dice.Index) + 1;
                    Else
                        newDie.Index = 1;
 
                    Randomizer.RollDice(newDie, SideCount);
                    Items.Add(newDie);
                }
                NotifyPropertyChanged("DiceCount");
            }
        }
    }
 
    private int _sideCount = 6;
    public int SideCount
    {
        get { return _sideCount;}
        set
        {
            if (_sideCount != value)
            {
                _sideCount = value;
                NotifyPropertyChanged("SideCount");
            }
        }
    }
    public event PropertyChangedEventHandler PropertyChanged;
    private void NotifyPropertyChanged(String propertyName)
    {
        PropertyChangedEventHandler handler = PropertyChanged;
        if (null != handler)
            handler(this,
              new PropertyChangedEventArgs(propertyName));
    }
}

public class Die : INotifyPropertyChanged
{
    private int _Index;
    public int Index
    { 
        get { return _Index; }
        set {
            if (value != _Index)
            {
                _Index = value;
                NotifyPropertyChanged("Index");
            }
        }
    }
 
    private int _DotCount;
    public int DotCount
    {
        get { return _DotCount; }
        set
        {
            if (value != _DotCount)
            {
                _DotCount = value;
                NotifyPropertyChanged("DotCount");
            }
        }
    }
 
    private bool _Frozen;
    public bool Frozen
    {
        get { return _Frozen; }
        set
        {
            if (value != _Frozen)
            {
                _Frozen = value;
                NotifyPropertyChanged("Frozen");
            }
        }
    }
 
    public event PropertyChangedEventHandler PropertyChanged;
    private void NotifyPropertyChanged(String propertyName)
    {
        PropertyChangedEventHandler handler = PropertyChanged;
        if (null != handler)
        {
            handler(this, new PropertyChangedEventArgs(propertyName));
        }
     }

    private static Random rand = new Random();
    public static void RollDice(Die instance, int sideCount)
    {
        int newValue = (int)
        (rand.NextDouble() * sideCount) + 1;
        instance.DotCount = newValue;
    }
}

For the UI, you add textboxes to bind to the MainViewModel's DiceCount and SideCount properties. You bind the MainListBox to the Items collection changing the DataTemplate to a horizontal stack panel consisting of a checkbox bound to Frozen and a TextBlock bound to the DotCount. Figure 2 shows the rudimentary UI using the sample data that you modified based on the class changes above. The code implementation appears in the code sample downloads for this article at www.codemag.com.

Figure 2: Here is the resulting Game UI.
Figure 2: Here is the resulting Game UI.

For the functionality of the game, you could add a simple button handler for the “Roll” button that iterates over the unfrozen dice list (filtered by LINQ, of course) and rolls each dice using some randomizing.

void RollButton_Click(object sender,
                      RoutedEventArgs e)
{
var unfrozenDice =
       from dice in App.ViewModel.Items
where !dice.Frozen
select dice;

foreach (var dice in unfrozenDice)
  Die.RollDice(dice,
       App.ViewModel.SideCount);
}

At this point, you've set up the core of the sample application. Now, you need to get started making this application asynchronous and adding more complex processing.

Turning Enumerables into Observables

The core of Rx relies on the IObservable interface and a series of extension methods on the interface. For Windows Phone, these libraries ship in the System.Observable and Microsoft.Phone.Reactive namespaces. To begin working with Rx, add a reference to these libraries as shown in Figure 3. While you're adding references, go ahead and add one for Microsoft.Devices.Sensors as you'll use the Accelerometer sensor in it in a bit. For Windows Phone 8, these references are already included as part of the “.NET for Windows Phone” default references.

Figure 3: Add the Reactive references.
Figure 3: Add the Reactive references.

With the references in place, you can start to migrate the code to start using Rx. To begin, you need to import the reactive namespace into the MainPage's code behind. Add a using clause for Microsoft.Phone.Reactive at the top of the file.

using Microsoft.Phone.Reactive; 

Now you can start using Rx. To begin, change the source of the data from IEnumerable into IObservable. Rx provides numerous ways to create Observables. In the case of known lists of values, you can use the ToObservable extension method.

var unfrozenDice = 
   from dice in App.ViewModel.Items
      .ToObservable()

ToObservable enables the values in the list to be pushed through the query pipeline rather than relying on an iteration process to pull them through the LINQ iterators. This push action creates a non-blocking asynchronous pipeline.

You also need to replace the foreach implementation by subscribing to the new IObservable with an IObserver. Although you could create separate classes for each IObserver and implement the OnNext, OnCompleted, and OnError methods, you'll rely on an easier alternative–passing a lambda expression into the Subscribe method with the implementation code. This next snippet shows how you can use ToObservable and Subscribe.

var unfrozenDice = 
   from dice in App.ViewModel.Items
                        .ToObservable()
   where !dice.Frozen
   select dice;

   unfrozenDice.Subscribe( dice =>
        Die.RollDice(dice,
        App.ViewModel.SideCount));

If you run the code at this point, you may notice that the value in each of your dice changes in a random order rather than always from the top down. Because you have now made the request asynchronous, you can't rely on the order of the results to be the same as the starting list. Each result appears when it completes the evaluation.

Creating Observables from Events

Rx provides a number of ways to create observable collections. So far, you've created them from a list of known values that start working as soon as you subscribe to them, called cold observables. Hot observables, on the other hand, happen whether you subscribe to them or not.

An event represents one example of a hot observable. Events will fire regardless of whether anyone is listening to them. The query pipeline will only be used once you start subscribing to them.

A list of known values that start working as soon as you subscribe to them are called Cold observables. Hot observables, on the other hand, happen whether you subscribe to them or not.

Let's modify the code a bit to change the RollButton click handler into another source of observable values. In this case, you'll use the Observable.FromEvent method to generate the observable.

var buttonObservable =
   Observable.FromEvent<RoutedEventArgs>
      (RollButton, "Click");

You can now add the buttonObservable object as an additional data source and use SelectMany (or “from ... from”) to stream the button click observables into the dice observable.

from buttonClick in buttonObservable
from dice in App.ViewModel.Items.ToObservable()
where !dice.Frozen
select dice;

Rather than wiring this up on each button click, move all of your code to the OnNavigatedTo and remove the ButtonClick Handler because you only need to attach the observable when you start the page.

Cleaning Up Resources

When working with observables, you need to be careful not only when you set up a request, but, perhaps more importantly, when you tear down a long-running processes. With the Windows Phone 7 page, you have now setup a query pipeline and subscription in the OnNavigatedTo method. But how do you free up these resources when your user navigates away from the page or app?

When you subscribe to the IObservable, the subscription returns an IDisposable object. When you dispose of this object, the rest of the pipeline cleans up after itself. Thus, you'll add a form-level variable of type IDisposable called _diceObserver. In the OnNaviatingFrom method, you'll dispose of it. Then, you assign the _diceObserver to the result of the query.Subscribe, as shown in Listing 2.

Listing 2: Set-up and Tear-down

private IDisposable _diceObserver;

protected override void OnNavigatedTo
      (System.Windows.Navigation.NavigationEventArgs e)
{
   SetupObservable();
}
protected override void OnNavigatingFrom
      (System.Windows.Navigation.NavigatingCancelEventArgs e)
{
   if (_diceObserver != null)
   {
       _diceObserver.Dispose();
       _diceObserver = null;
   }
}

private void SetupObservable()
{
   var buttonObservable =
      Observable.FromEvent<RoutedEventArgs>
         (RollButton, "Click");

   var query = 
      from buttonClick in buttonObservable
      from dice in App.ViewModel.Items.ToObservable()
      where !dice.Frozen
      select dice;

   _diceObserver = query.Subscribe(dice =>
       Randomizer.RollDice(dice, App.ViewModel.SideCount));
}

Detecting Accelerometer Gestures

The ease with which you can compose relatively complex operations proves to be one of reasons that the Reactive Extensions shipped originally with the Silverlight Toolkit and subsequently natively on the Windows phone. For the dice-rolling game, you might want to take advantage of new user interface opportunities presented in the phone itself. For example, why would you be satisfied pushing a button on the screen, when a more natural action might be to physically shake the dice container– the phone itself?

Using the Accelerometer sensor provided in the Microsoft.Devices.Sensors library that you imported earlier, you can create an observable based on the ReadingChanged event. The EventArgs for this event provide an indication of how much the user moves the device along the X, Y, and Z axes. In order to differentiate between an actual shake and other more subtle movements, you need to test to see if the change in these arguments exceeds some tolerance level using the formula:

X * X + Y * Y > 1.44

If you simply filter the event by this formula, you can detect if someone has aggressively moved the phone. However, this doesn't necessarily indicate that they have shaken it. To detect a shake (rather than a drop, for instance), you need to determine if two (or more) of these movements occur within quick succession. If you were to do this manually, you would need to maintain a list of detected motions along with the times that each of these occurred and then compare the times to make sure they happen within a small enough time period.

Conveniently, Rx provides a number of time-related methods, including Timestamp, which can attach the time that each observable occurs to the observable itself. Using Timestamp requires you to maintain the state of the last observed value along with the current one to compare the two values.

Once again, Rx comes to rescue you from complex state management with a TimeInterval method. TimeInterval exposes the amount of time between consecutive observables and the last observable itself. With these methods, you can implement the shake detection, as shown in Listing 3.

Listing 3: Detecting a Shake Gesture

const double MinimumOffset = 1.44;
const double MinTime = 200;
public static IObservable<
    IEvent<nAccelerometerReadingEventArgs>>
    ShakeObs(Accelerometer accel)
{
   var readingChangedObservable = Observable
      .FromEvent<AccelerometerReadingEventArgs>
          (accel, "ReadingChanged");

   var query =  
      from interval in 
          (from start in readingChangedObservable
           where (start.EventArgs.X * start.EventArgs.X + 
                  start.EventArgs.Y * start.EventArgs.Y) >
                  MinimumOffset
           select start).TimeInterval()
       where interval.Interval.TotalMilliseconds < MinTime
       select interval.Value;

    return query;
}

Returning to the main routine, you now need a way to combine the button click observables with the new shake observer. You've already seen how you can pipe multiple observables together using “from ... from” or, if you prefer, Lambda syntax using SelectMany.

Rx provides a variety of options to combine Observables, including SelectMany, Merge, Zip, Amb, Join, TakeUntil, and Join.

In this case, you want to re-roll the dice when the user clicks the Roll button or if they shake the phone. As a result, you'll use the Merge command to combine both observable collections into a new observable value whenever either occurs. you do need to take care to force both observables to return the same type in order to use Merge. Because both events generate differing event arguments, you need to cast them down to the base EventArgs class in the merge as shown in Listing 4.

Listing 4: Merging Observables

var accel = new Accelerometer();
var rollDetected = 
   (from buttonClick in buttonObservable
    select (EventArgs)buttonClick.EventArgs)
   .Merge(from shaken in GestureObservables.ShakeObs(accel)
          select (EventArgs)shaken.EventArgs);

   var query = 
      (from evt in rollDetected
       from dice in App.ViewModel.Items.ToObservable().
       where !dice.Frozen
       select dice)
      .ObserveOnDispatcher();

Managing Threads

Looking at the end of Listing 4, you'll notice yet another subtle enhancement of the query: ObserveOnDispatcher. When you consumed the accelerometer in the ShakeObserver, I neglected to mention that the Accelerometer device actually works on a background thread. When you merged this with the button observable, the results returned merged on the background thread as well. Because .NET only allows accessing the UI from the main thread, you need to somehow return to the dispatcher thread when you display the new roll values.

Rx uses Schedulers to perform units of work and allows you to specify when and where you want the work to be performed. You can then pass any IScheduler implementation, including a new thread from the ThreadPool, into one of the following methods that takes one of these Schedulers.

  • Start initiates observing on that schedule.
  • SubscribeOn causes subsequent work to happen on the specified Scheduler when you subscribe
  • ObserveOn brings execution back to the specified Scheduler.

In addition, Rx provides specialized implementations for observing and subscribing to transition back to the dispatcher. In this case, you want to the Subscriber to observe on the dispatcher thread, so you use ObserveOnDispatcher.

Performing Service Requests

Quite often, when working with Silverlight or the phone, you need to use services to get information from outside sources, or to hide the intellectual property of the implementations. For example, if you wanted to load the dice and not let prying users know (not that I condone cheating) you would roll the dice behind a service tier.

The Windows Phone platform performs all service requests asynchronously. As a result, they are a natural fit for Rx when composing complex interactions. Although you could use the Observable.FromEvent to listen for the ServiceCompleted event, you would lose track of which dice you were rolling as each service request returns. Instead, you'll create the observable service request using the Observable.FromAsyncPattern.

var svcObserver =
  Observable.FromAsyncPattern<int, int>
    (proxy.BeginRollDice, proxy.EndRollDice);

In FromAsyncPattern, you indicate the type(s) that you pass in as parameters to the method and the type that will be returned as the last type in the generic definition. You then pass pointers to the BeginRollDice invoker and EndRollDice callback methods from the service.

When you generate a proxy from Web Services, Visual Studio doesn't expose the begin/end method pairs in Silverlight natively. So, how do you access these private methods? Visual Studio generates an interface alternative that includes these methods. If you cast the proxy to the interface, you can then use the begin/end pair in the FromAsyncPattern implementation.

Listing 5 demonstrates how you can declare the service proxy and the svcObserver using FromAsyncPattern. In addition, you modify the query by calling the Invoke method passing in the number of sides for the dice. Invoke returns an IObservable<int> with the results of the service request that you can use later as another data source.

Listing 5: Invoke Services with FromAsyncPattern

var buttonObservable = Observable.FromEvent<RoutedEventArgs>
   (RollButton, "Click");
var proxy = new RandomizerService.RandomizerClient() 
   as Dice.RandomizerService.IRandomizer;
var svcObserver = 
   Observable.FromAsyncPattern<int, int>
      (proxy.BeginRollDice, proxy.EndRollDice);

var rollDetected =
   (from buttonClick in buttonObservable
    select (EventArgs)buttonClick.EventArgs)
   .Merge(from shaken in ShakeObserver.GetObserver(accel)
          select (EventArgs)shaken.EventArgs);

var query =
   (from evt in rollDetected
    from dice in App.ViewModel.Items.ToObservable()
    where !dice.Frozen
    from result in svcObserver
       .Invoke(App.ViewModel.SideCount)
    select new { dice, svcResult = result })
   .ObserveOnDispatcher();

_diceObserver = query.Subscribe(val => 
   val.dice.DotCount = val.svcResult);

Finishing off the changes, you now need to assign the returned value from the service to the actual rolled dot count. You'll perform this in the lambda expression that you pass into the Subscribe method. Remember: These results may not return in the same order that you requested them. Rx takes care of synchronizing the objects that made the request and the corresponding return values.

Throttling Requests

When working with external resources, you want to prevent issuing unnecessary requests. Rx offers several options to limit the requests. For instance, if you have sent a request but not received a response and want to send another request without processing the results of that request, you can enhance the query using the TakeUntil method as follows:

from result in svcObserver
   .Invoke(App.ViewModel.SideCount)
   .TakeUntil(rollDetected)

Additionally, if the user aggressively pushes the roll button too many times, or shakes the phone excessively, you can Throttle the request for a specified TimeSpan. Once that time span passes, you'll call OnNext as long as the user didn't request another roll. To throttle for � second, add one line to the rollDetected definition:

(from buttonClick in buttonObservable
 select (EventArgs)buttonClick.EventArgs)
.Merge(
 from shaken in ShakeObserver.GetObserver(accel)
 select (EventArgs)shaken.EventArgs)
.Throttle(TimeSpan.FromMilliseconds(500));

Conclusion

If you need simple event handlers and service requests, the techniques mentioned here may be overkill. However as your application needs to compose increasingly complex asynchronous operations, consider taking advantage of the power offered by the Reactive Extensions that ship with the Windows Phone.

If you want to use Rx with .NET, Silverlight, XNA, or JavaScript, run over to the Rx development center on MSDN. From the dev center, you can access the downloadable bits along with instructional videos, documents, and blog posts. Also, you can visit the Rx Forums to ask your questions or make suggestions to the team. I also recommend watching the various videos on Channel 9 (http://channel9.msdn.com/Tags/rx/) where the team discusses how and why Rx works.

I've only touched the surface of the capabilities of Rx. I hope that you have fun exploring new ways of joining the prescribed Rx methods to create increasingly powerful cures for what ails your phone applications.