Tag Archives: reactive extensions

Batching Data by Time or Count Using Reactive Extensions (Rx)

Motivation

The time-series database InfluxDB provides a HTTP API to write data. Data points (measurements) are inserted via a Line protocol, which allows batching of data by writing multiple points in one HTTP request.

While experimenting for a simple InfluxDB C++ client, I wanted to create an asynchronous fire-and-forget API, so that the data points can be sent over HTTP without blocking the instrumented C++ code. Several “readymade” options to implement concurrency in this scenario are available.

A simple PAIR of ZeroMQ sockets would do the job, but I’d have to implement batching separately. Thus, I turned my attention to a higher-level abstraction: Rx

Rx Window Operator

Quickly looking through the cross-language Reactive Extensions site, I found the right operator: Window.

This operator has luckily been implemented in RxCpp, thus I proceeded with the experiment.

Batching Design

Batching using Rx Winodw Operator

Rx Window Operator (CC BY 3.0 reactivex.io)1

The window operator takes an observable sequence of data and splits it into windows (batches) of observables. To batch requests, the observable windows of data are aggregated to a single value upon the last value from the windows (via other aggregating Rx operators).

A Toy Problem

To validate the approach, the following problem is set:

Given a stream of integers, append the integers into a series of strings, either every second, or every N integers

String appends with integer-to-string conversions in C++ will be done via the {fmt} library.

Batching in One Line of Code

A stream of numbers batched either by time or count:

auto values = rxcpp::observable<>::range(1, 1000'000)
    .window_with_time_or_count(std::chrono::seconds(1), 100'000);

Note, there is an almost one-to-one translation into a C# version:

var values = Observable.Range(1, 1000000)
               .Window(TimeSpan.FromSeconds(1), 100000);

This indicates the power of the Rx abstraction across languages. The Rx website provides just the right sorting of the documentation to be able to translate Rx code from one language to another.

Aggregating the Batches

In order to do something useful with the batched data, the Scan operator is used to gather the data in a string buffer, and after the last value has been received, the string buffer is assembled into a string and processed:

values.subscribe(
    [](rxcpp::observable<int> window) {
        // append the number to the buffer
        window.scan(
            std::make_shared<fmt::MemoryWriter>(),
            [](std::shared_ptr<fmt::MemoryWriter> const& w, int v)
        {
            *w << v;
            return w;
        })

        // what if the window is empty? Provide at least one empty value
        .start_with(std::make_shared<fmt::MemoryWriter>())

        // take the last value
        .last()

        // print something fancy
        .subscribe([](std::shared_ptr<fmt::MemoryWriter> const& w) {
            fmt::print(
                "Len: {} ({}...)\n",
                w->size(),
                w->str().substr(0, 42)
            );
        });            
    }
);

The Tale of Two Bugs

In the initial (non-TDD) spike, the batching seemed to work, however, something caught my attention (the code bites back):

[window 0] Create window
Count in window: 170306
Len: 910731 (123456789101112131415161718192021222324252...)

the window wasn’t capped at 100’000. This could have been either a misunderstanding or a bug, thus I formulated a hesitant issue #277. As it turned out, it indeed was a bug, which was then fixed in no time. However, the first bug has hidden another one: the spike implementation started to crash at the end: when all the windows were capped by count, and not by time, last window was empty, as all values fit exactly into 10 batches.

The Last operator rightly caused an exception due to an empty sequence. Obviously, there’s no last value in an empty sequence. Rubber Ducking and a hint from Kirk Shoop fixed the issue by utilizing the StartWith operator to guarantee, the sequence is never empty. An empty string buffer can be ignored easier downstream.

Active Object

The active object pattern was applied to implement a fire-and-forget asynchronous API. A Rx Subject to bridge between the function call and the “control-inverted” observable:

struct async_api {
    //...
    rxcpp::subjects::subject<line> subj;
    //...

    async_api(...)
    {
        auto incoming_requests = subj
            .get_observable()
            .map([](auto line) {
                return line.to_string();
            });

        incoming_requests
            .window_with_time_or_count(
                window_max_ms,
                window_max_lines,
                // schedule window caps on a new thread
                rxcpp::synchronize_new_thread()
            )
            .subscribe(...)
        ;
    }

    // fire-and-forget
    void insert(line const& line)
    {
        subj
            .get_subscriber()
            .on_next(line);
    }
};

in order not to block the caller (which would be the default behavior), the observable watches the values from each window on a new thread. Here, scheduling on a thread pool (currently missing in RxCpp) would probably be beneficial.

While this implementation might not be an optimal one, the declarative nature of Rx, once the basics are understood, allows to “make it work and make it right” pretty quickly by composing the right operators.

Code

The runnable code of the example can be found at Github: C++ version.

In order to show, how similar the high level code can be between different languages when writing, I’ve “ported” the example to C# 2.

  1. Source: reactivex.io License: (CC BY 3.0)
  2. The C# version appears to run faster on my windows machine while solving the same toy problem

Why wait forever for the tests? Fast tests of slow software.

Time is volatile

Imagine writing a cron-like functionality that should produce some side-effect, such as cleanup. The intervals between such actions might be quite long. How does one test that? One can surely reason about the software, but given a certain complexity, test should be written, proving that certain important scenarios work as intended.

It’s common that software depends on time flow as dictated by the physical time flow, reflected via some clock provider. However, resetting the time to a year ahead won’t make the CPU work faster and make all the computations it should have performed within that year. A clock is also a volatile component that can be manipulated, thus if time is an issue, it’s probably a good idea not to depend on it directly, following the Stable Dependencies Principle and the Dependency Inversion Principle.

Luckily, there is an abstraction for time, at least in Reactive Extensions (Rx), which is the Scheduler.

Slow non-tests

Here’s a slow Groovy non-test, waiting for some output on the console using RxGroovy:

import rx.*
import java.util.concurrent.TimeUnit

def observable = Observable
	.just(1)
	.delay(5, TimeUnit.SECONDS)

observable.subscribe { println 'ah, OK, done! Or not?' }

Observable
	.interval(1,TimeUnit.SECONDS)
	.subscribe { println 'still waiting...' }

println 'starting to wait for the test to complete ...'

observable.toBlocking().last()

Running it produces the following slow-ticking output:

oldnontest 1

Interpreting such tests without color can be somewhat challenging 2.

Fast tests

Now let’s test something ridiculous, such as waiting for a hundred days using Spock. Luckily, RxJava & RxGroovy also do implement the test scheduler, thus enabling fast tests using virtual time:

import spock.lang.Specification

import rx.Observable
import rx.schedulers.TestScheduler
import java.util.concurrent.TimeUnit


class DontWaitForever extends Specification {
    def "why wait?"() {
        setup:
            def scheduler = new TestScheduler()

            // system under test: will tick once after a hundred days
            def observable = Observable
                .just(1)
                .delay(100, TimeUnit.DAYS, scheduler)
            def done = false

        when:
            observable.subscribe {
                done = true
            }

            // still in the initial state
            done == false

        and:
            scheduler.advanceTimeBy 100, TimeUnit.DAYS

        then:
            done == true
    }
}

fasttest 3

just checking, advancing the time by 99 days results in a failure:

just_checking

Delightful, groovy colors!

Source

github.com/d-led/dont_wait_forever_for_the_tests

  1. Caputured with the wonderful pragmatic tool LICEcap by the Reaper developers
  2. Here, the ‘still waiting’ subscription is terminated after the first subscription ends. Try exchanging the order of the subscribe calls.
  3. Building using Gradle

Deterministic Testing of Concurrent Behavior in RxCpp

A Retrospective

After getting inspired by The Reactive Manifesto, it is hard not to get excited about Reactive Extensions. Such excitement has lead to a series of hello-world articles and some code examples. While Reactive Extensions take over the programming world in C#, Java and JavaScript, it seems, the world of C++ is slow to adopt RxCpp.

The new ReactiveX Tutorial link list is a great place to start learning and grokking. This article is an attempt to bring RxCpp closer to C++ developers who might not see yet, how a reactive programming model might help writing better, more robust code.

Testing concurrency with RxCpp

A previous article showed how to test ViewModels in C# by parameterizing the ViewModels with a scheduler. In a UI setting, the scheduler usually involves some kind of synchronization with the GUI thread. Testing keystrokes arriving at certain speed would require some effort to simulate events, probably leading to brittle tests. With the scheduler abstraction, the concurrent behavior of a component is decoupled from physical time, and thus can be tested repeatedly and very fast. This was the C# test:

(new TestScheduler()).With(scheduler =>
{
    var ticker = new BackgroundTicker(scheduler);

    int count = 0;
    ticker.Ticker.Subscribe(_ => count++);
    count.Should().Be(0);

    // full control of the time without waiting for 1 second
    scheduler.AdvanceByMs(1000);
    count.Should().Be(1);
});

Show Me The Code

Without further ado, the C++ version is not very far from the C# version. In a simple test, we can parameterize a sequence of integer values arriving at specified intervals (a ticker) with a coordination (why coordination and not scheduler, read in the RxCpp developer manual:

auto seq = rxcpp::observable<>::interval(
            std::chrono::milliseconds(1),
            some_scheduler
);

The deterministic test scheduler API is currently available through a worker created on the test scheduler:

auto sc = rxcpp::schedulers::make_test();
auto worker = sc.create_worker();
auto test = rxcpp::identity_same_worker(worker);

The rest should read like English:

int count = 0;

WHEN("one subscribes to an observable sequence on the scheduler") {
  auto seq = rxcpp::observable<>::interval(
              std::chrono::milliseconds(1),
              test // on the test scheduler
             ).filter([](int i) { return i % 2; });

  seq.subscribe([&count](int){
    count++;
  });

  THEN("the sequence is not run at first") {
    worker.sleep(2 /* ms */);

    CHECK(count == 0);

    AND_WHEN("the test scheduler is advanced manually") {

      THEN("the sequence is run as expected") {
        worker.advance_by(8 /* ms */);
        CHECK(count == 5);
      }
    }
  }
}

The full test can be seen @github, and is built on Travis CI

RxCpp 2

RxCpp 2 and API

The last article on rxcpp was based on a now obsolete version of RxCpp. The key contributor to the library, Kirk Shoop, has kindly provided a rewrite based on the newer, 2.0 API of the library: see the pull request, upon which this article is based.

Since the first article, the project has been enriched with somewhat more readable GIVEN/WHEN/THEN-style tests using Catch 1.

Still Ticking: Scheduler and Coordination in RxCpp 2

The previous articles give examples of managing periodic events, such as ticker ticks and measurements in c++. The following example creates an event loop that will be used for coordinated output of various events to the console:

auto scheduler = rxcpp::schedulers::make_same_worker(
    rxcpp::schedulers::make_event_loop().create_worker()
);

auto coordination = rxcpp::identity_one_worker(scheduler);

One such sequence of events is some kind of measurement 2

auto measure = rxcpp::observable<>::interval(
        // when to start
        scheduler.now() + std::chrono::milliseconds(250),
        // measurement frequency
        std::chrono::milliseconds(250),
        coordination)
    // take Hz values instead of a counter
    .map([&FM](int) { return FM.Hz(); });

auto measure_subscription = measure
    .subscribe([](int val) {
        std::cout << val << std::endl;
    });

Why didn’t it tick?

If this code were the end of the main program, there wouldn’t be any observable ticks, as all the objects would be destroyed before the first scheduled event. To see the code in action, we shall wait for some condition that will change when we’re done. This step is not necessary if there’s a GUI toolkit event loop that keeps objects alive, but it has to be simulated for a console example.

To demonstrate the subscription change and wait for some time, we’ll wait twice for an atomic variable to become zero:

std::atomic<long> pending(2);

...

// after all subscriptions defined
while (pending) {
    sleep(1000); // wait for ticker and measure to finish
}

Tick and Stop

The other ticker will have another period, will only tick 10 times, and then decrement the pending counter:

auto ticker = rxcpp::observable<>::interval(
    scheduler.now() + std::chrono::milliseconds(500),
    std::chrono::milliseconds(500),
    coordination);

ticker
    .take(10)
    .subscribe([](int val) {
        std::cout << "tick " << val << std::endl;
    },[&](){
        --pending; // take completed the ticker
    });

Now, we can schedule the termination of the measurement (decrement pending) subscription halfway through the 10-tick run. This scheduling is done on the same scheduler that is running all the subscriptions:

scheduler.create_worker().schedule(scheduler.now() + std::chrono::seconds(2), 
    [&](const rxcpp::schedulers::schedulable&) {
        std::cout << "Canceling measurement ..." << std::endl;
        measure_subscription.unsubscribe(); // cancel measurement
        --pending; // signal measurement canceled
    });

The result:

63
tick 1
63
61
tick 2
63
61
tick 3
63
62
Canceling measurement ...
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
tick 10

Thanks, Kirk & other library contributors!

Code @ github

Next: deterministic testing of concurrent behavior

  1. i.e. create.cpp
  2. Observe the convergence of the API towards the C# version.

No Events: ReactiveUI Windows Forms MVVM-Style

Review

designed using http://viperneo.github.io/winforms-modernui/

This is the next post in the series, looking first at Reactive Extensions (RX) to simplify writing Windows Forms UI logic, then using a viewmodel shared between a WPF gui implementation and a rewritten WinForms version using ReactiveUI, stopping at a short article on testing the viewmodels.

passedtest

ReactiveUI News

ReactiveUI API has been quite volatile for the last year, and this series is in need of an update1. A CodeProject author gardner9032 published a nice teaser article, showing the ReactiveUI mechanism for writing simplified Viewmodel-View bindings 2, which serves as primary trigger for this post.

There’s plenty of news and updated articles on Paul Betts’ log, providing a good resource for updates on the API. Phil Haack’s blog is also a superb resource for explanations and commentaries on the use of ReactiveUI in real-world applications.

The ReactiveUI project is quite active, as the community seems to have grokked the jist of it, while the list of supported platforms has become more than convincing.

Getting rid of events

The enabling feature of ReactiveUI is writing declarative UI glue code, and if the viewmodels are based on Reactive Extensions, then declarative C# style can be used throughout the project. The previous ReactiveUI Windows Forms examples converted an event sequence into an observable sequence of values. In this example, that will be accomplished conveniently by the ReactiveUI WinForms lbrary. The viewmodels also contained some imperative code. For this article, the viewmodels will not be reused from the previous articles, but written from scratch.

ViewModel

s. code

The viewmodel’s task is the same: something is ticking in the background, while words are counted in the input text asynchronously, and the calculation is throttled to 0.5 seconds. The viewmodel boilerplate is simplified using ReactiveUI.ReactiveObject.

Output (read-only) properties

The ReactiveUI-way of creating output properties is through ObservableAsPropertyHelper.

private readonly ObservableAsPropertyHelper<string> backgroundTicker;
public string BackgroundTicker
{
    get
    {
        return backgroundTicker.Value;
    }
    //...
}

The constructor of the viewmodel receives an IScheduler for scheduling on the correct thread, and an IObservable, which will be a stream of input from the view. Observe the ToProperty helper:

public MyViewModel(IScheduler scheduler, IObservable<string> input)
{
    Observable.Interval(TimeSpan.FromSeconds(1))
        .ObserveOn(scheduler)
        .Select(_ => DateTime.Now.ToLongTimeString())
        .ToProperty(this, x => x.BackgroundTicker, out backgroundTicker);
    //...
}

Word counting logic is implemented in a similar fashion by transforming the incoming stream of strings.

View

s. code

To remove yet more boilerplate code, WinForms Form specialization implements the ReactiveUI.IViewFor interface. This allows for largely simplified run-time and compile-time checked bindings, avoiding using strings for property names. The implementation is straightforward, and pays off once the views become more complex than this example:

IViewFor

public MyViewModel VM { get; private set; }

object IViewFor.ViewModel
{
    get { return VM; }
    set { VM = (MyViewModel)value; }
}

MyViewModel IViewFor<MyViewModel>.ViewModel
{
    get { return VM; }
    set { VM = value; }
}

Bindings

None of the controls in the designed WinForm have wired events or bindings set from the designer. The glue code is reduced to instantiating the viewmodel …

VM = new MyViewModel(
    new System.Reactive.Concurrency.ControlScheduler(this),
    this.WhenAnyValue(x => x.inputBox.Text)
);

… and declaring the bindings3 4.

this.Bind(VM, x => x.BackgroundTicker, x => x.tickerBox.Text);
this.Bind(VM, x => x.WordCount, x => x.wordCountBox.Text);

Source

@github

  1. See ReactiveUI Design Guidelines
  2. see article @CodeProject
  3. The ReactiveUI WinForms implementation seems not to support fully read-only fields using default bindings yet, hence an empty setter in the viewmodel
  4. The scheduler is from Windows Forms helpers

ReactiveUI 6 and ViewModel Testing

Testability and ReactiveUI

ReactiveUI XAML example

In the previous articles about ReactiveUI I’ve claimed without bringing any evidence that writing ViewModels using ReactiveObjects brings about testability. While the aspects of testing Rx and ReactiveUI have been treated at length in the respective authors’ blogs linked herein, this post is intended as a quick glance for the impatient online surfer at the hello-world testing code, which has been written “post-mortem” 1 as a follow-up to the previous articles.

An update to ReactiveUI 6

Paul Betts and contributors have been busy simplifying and extending the library 2. There are some extension methods now that help creating observables from properties, and transforming observables to properties. In the example ViewModel from previous articles, there’s an observable stream of strings that is simply transformed into a property defined as follows:

ObservableAsPropertyHelper<string> _BackgroundTicker;
public string BackgroundTicker
{
    get { return _BackgroundTicker.Value; }
}

In the constructor, the helper is now initialized without using strings:

public WordCounterModel(IObservable<string> someBackgroundTicker)
{
someBackgroundTicker
    .ToProperty(this, x => x.BackgroundTicker, out _BackgroundTicker);
...
}

instead of string-based error-prone

_BackgroundTicker = new ObservableAsPropertyHelper<string>(
	someBackgroundTicker, _ => raisePropertyChanged("BackgroundTicker")
);

For the actual changes in ReactiveUI, consult Paul Betts’ insightful log.

A simple test

Since the tests have been written after writing the example code, I’ve been searching for the “Generate Unit Test” context menu in Visual Studio 2013. The context menu is not there, but luckily some enthusiasts recreated the functionality partly: → Unit Test Generator.

After the initial set-up and

failedtest

here’s the simple test of the word count property:

[TestMethod]
public void WordCounterModelTest()
{
    var mock = new Mock<IObservable<string>>();
    var vm = new WordCounterModel(mock.Object);

    vm.WordCount.Should().Be(0);

    vm.TextInput = "bla!";
    vm.WordCount.Should().Be(1);

    vm.TextInput = "bla, bla!!";
    vm.WordCount.Should().Be(2);
}

In it, one can observe the use of MOQ for mocking a dummy and FluentAssertions for beautifully readable Spec/BDD-style assertions 3.

So, there’s no UI involved, and the UI is dogmatically bound from XAML with almost no code-behind.

Testing time series

The hello world example program simulated a dependency on some timed series of strings, ticking every second. While this is not specific to ReactiveUI, let’s make use of the test scheduler 4. For that, the time series should optionally depend on an injected IScheduler:

public class BackgroundTicker
{
    IScheduler scheduler = Scheduler.Default;

    public BackgroundTicker(IScheduler other_scheduler = null)
    {
        if (other_scheduler != null)
            scheduler = other_scheduler;
    }

    public IObservable<string> Ticker
    {
        get
        {
            return Observable
                .Interval(TimeSpan.FromSeconds(1), scheduler)
                .Select(_ => DateTime.Now.ToLongTimeString());
        }
    }
}

The test instantiates a test scheduler, which is then advanced to make deterministic assertions. The code should speak for itself:

[TestMethod]
public void BackgroundTickerTest()
{
    (new TestScheduler()).With(scheduler =>
    {
        var ticker = new BackgroundTicker(scheduler);

        int count = 0;
        ticker.Ticker.Subscribe(_ => count++);
        count.Should().Be(0);

        scheduler.AdvanceByMs(1000);
        count.Should().Be(1);

        scheduler.AdvanceByMs(2000);
        count.Should().Be(3);
    });
}

Summary

passedtest

Code: https://github.com/d-led/reactiveexamples

Previous article: The WPF + ReactiveUI Refactored Version of the Responsive UI Hello World.

See also: the c++ version.

  1. as in, not within TDD
  2. which now also targets Xamarin and Windows Phone 8 and Windows Store Apps
  3. I originally intended to use SpecFlow, but the specs refused to flow frictionlessly
  4. see Intro to Rx

A C++ Background Ticker, now with Rx.cpp

Finally, Rx.cpp

Some time ago I have written that I didn’t have enough patience to recreate the background ticker example in C++ using Rx.cpp. Since then the Rx.cpp project seems to have grown out of the spike phase, and even has a native NuGet package. It has also gone multiplatform (Windows, OSX and Linux): observe the green Travis-CI Button.

Update: new blog post, discussing RxCpp v2 and testing using the test scheduler.

A simple console ticker

As in .Net, Reactive Extensions provide a simple way to process streams of data asynchronously, while keeping the concurrency-related code declarative and thus readable. Here’s a simple ticker in the console which runs asynchronously to the main thread:

auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>();
auto ticker = rxcpp::Interval(std::chrono::milliseconds(250), scheduler);

rxcpp::from(ticker)
	.where([](int val) { return val % 2 == 0; })
	.take(10)
	.subscribe([](int val) {
		std::cout << "tick " << val << std::endl;
	});

std::cout << "starting to tick" << std::endl;

resulting in something like:

starting to tick
tick 0
tick 2
tick 4
tick 6
tick 8
...

where the ticks appear once in 250 milliseconds.

Throwing away code

The PPL example was simulating polling a sensor and printing the value. It had an error-prone and buggy ad-hoc implementation of an active object, ticking at predefined intervals. This can be now happily thrown away, as Rx allows a cleaner concurrency control and testability using schedulers, and implements a timed sequence: Interval.

Preconditions

FrequencyMeter FM;
auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>();

The scheduler will be used for all subscriptions.

The tickers

The first one:

auto measure = rxcpp::Interval(std::chrono::milliseconds(250),scheduler);
auto measure_subscription = rxcpp::from(measure)
	.subscribe([&FM](int val) {
		std::cout << FM.Hz() << std::endl;
	});

where measure_subscription is a rxcpp::Disposable for subscription lifetime control.

And the other one:

auto ticker = rxcpp::Interval(std::chrono::milliseconds(500), scheduler);
rxcpp::from(ticker)
	.take(10)
	.subscribe([](int val) {
		std::cout << "tick " << val << std::endl;
	});

where you can observe the LINQ-style filter take

Managing subscriptions

In the PPL example, one could start and stop the ticker. However, in Rx.cpp this can be simply modeled by disposable subscriptions. Hence, after some kind of sleeping, the measurement can be stopped:

sleep(2000);
std::cout << "Canceling measurement ..." << std::endl;
measure_subscription.Dispose(); // cancel measurement

Resulting in similar output:

60
63
tick 0
61
62
tick 1
63
63
tick 2
62
60
tick 3
Canceling measurement ...
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9

Restarting measurement can be done by creating a new subscription.

Why not simply signals/slots?

Almost quoting the Intro to Rx book, the advantages of using Rx over (at least) simple implementations of signal/slot mechanism are:

  • Better maintainability due to readable, composable, declarative code
  • Scheduler abstraction allowing for fast, deterministic, clock-independent tests of concurrency concerns
  • Declarative concurrency through the same scheduler abstraction
  • LINQ-like composition and filtering of event streams
  • Easy subscription control via disposables
  • Completion and exception handling built-in in the Observer concept

Code

@GitHub

Corrections, suggestions and comments are welcome!

Update 26.6.2014: There’s been a new release of Rx.cpp on nuget, and Kirk Shoop pushed a pull request, upgrading the project and the api usage to Rx.cpp 2.0.0. There have been some changes, and there are some interesting patterns, which should be blogged about in the near future.

The WPF + ReactiveUI Refactored Version of the Responsive UI Hello World

Overview

To recapitulate, the first article in the series used Reactive Extensions (Rx) directly in Windows Forms UI logic to implement a UI with a simulated background calculation, which delivered a timestamp each second. The rest of the application was an input text box, for which, when the user inputs the text, the word count is calculated asynchronously and posted into another read-only text box.

The second article introduced a WPF version of the same UI with a viewmodel that implements the INotifyPropertyChanged interface and supplies the ticker text and the word count as bindable properties. The text being input was delivered to the viewmodel via an event converted to an observable sequence, which was used to update the TextInput property of the viewmodel. The WordCount was updated from the setter of TextInput.

The third article recreated the WPF implementation in WinForms using the same viewmodel, which is shared between the two projects. The third UI implementation introduced throttling of the TextInput changes to 0.3 seconds so that the word count is updated only when the user pauses for at least a third of a second to take a look at the result.

The Refactored WPF Version

In this article, the WPF UI is further refactored, so that the concerns are separated further. These are organized as follows:

Create a sequence of some calculation results

These are timestamps in this case, implemented as an object, providing an IObservable<string> property Ticker:

public class BackgroundTicker
{
    public IObservable<string> Ticker
    {
        get
        {
            return Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Select(_ => DateTime.Now.ToLongTimeString());
        }
    }
}

The observable sequence is active, delivering a value every second.

Count words in text asynchronously and provide another property calculated in the background

This viewmodel is finally implemented using ReactiveUI:

public class WordCounterModel : ReactiveObject

The word count and the input text are ReactiveUI boilerplate:

string _TextInput;
public string TextInput
{
    get { return _TextInput; }
    set { this.RaiseAndSetIfChanged(ref _TextInput, value); }
}

ObservableAsPropertyHelper<int> _WordCount;
public int WordCount
{
    get { return _WordCount.Value; }
}

The value sequence of the background ticker is injected at construction time using its observable member. The word count and the background ticker properties are implemented using an ObservableAsPropertyHelper.
The results of the background ticker are exposed via a property:

ObservableAsPropertyHelper<string> _BackgroundTicker;
public string BackgroundTicker
{
    get { return _BackgroundTicker.Value; }
}

the helper is initialized from the constructor-supplied observable:

someBackgroundTicker
    .ToProperty(this, ticker => ticker.BackgroundTicker, out _BackgroundTicker);

And finally, the word count logic is implemented as an output property, transforming the sequence of strings into a sequence of integers:

this.WhenAnyValue(x => x.TextInput)
    .Where(x => !string.IsNullOrWhiteSpace(x))
    .Select(x => x
        .Split()
        .Where(word => !string.IsNullOrWhiteSpace(word))
        .Count())
    .ToProperty(this, vm => vm.WordCount, out _WordCount);

The throttling can be removed from the viewmodel, as the WPF 4.5 has a Delay binding property, which can be used for the same purpose on the view side, freeing the viewmodel from that concern.

Provide a Graphical UI

Now let’s look at the view. The event subscription is gone and data binding is implemented in XAML declaratively. The DataContext of the main window is initialized with an instance of the viewmodel, which is initialized with an observable from an instance of a background ticker:

ViewModels.BackgroundTicker Ticker=new ViewModels.BackgroundTicker();
ViewModels.WordCounterModel VM = new ViewModels.WordCounterModel(Ticker.Ticker);
DataContext = VM;

This instantiation can be definitely made in XAML as well, nullifying the amount of C# code for the view, as for example in the following article.

The background ticker and the word count are now bound one-way and marked asynchronous.

<TextBox ... Text="{Binding BackgroundTicker,IsAsync=True,Mode=OneWay}"></TextBox>
<TextBox ... Text="{Binding WordCount,IsAsync=True,Mode=OneWay}"></TextBox>

And finally, the binding of the input text is configured in XAML to be throttled to 300 milliseconds and bind on PropertyChanged events of the Text property of the text box:

<TextBox ... Text="{Binding TextInput, UpdateSourceTrigger=PropertyChanged, Delay=300}"></TextBox>

Source Code

https://github.com/d-led/reactiveexamples

The WPF Version of the Responsive UI Hello World

What is lacking in the first version of the hello world example is the separation of UI and application logic. A refactored WinForms version may be the topic of a future post.

The next step in the discovery of Rx is to create a similar software using WPF. The details can be viewed in the source code, so let’s concentrate on the peculiarities. In this version, pure Rx will be used without ReactiveUI.

wpf version of ui hello world

WPF brings a possibility of total separation of UI, UI logic and application logic. A part of the application can be implemented by bindings to a viewmodel. The viewmodel has to implement the INotifyPropertyChanged interface. We’ll choose the implementation from here for the moment.

The viewmodel is defined as follows, omitting the INotifyPropertyChanged details:

public class MyViewModel : INotifyPropertyChanged
{
    //http://msdn.microsoft.com/en-us/library/ms229614.aspx details ...

    string _CurrentTime;
    public string CurrentTime
    {
        get { return _CurrentTime; }
        private set
        {
            if (value != _CurrentTime)
            {
                _CurrentTime = value;
                NotifyPropertyChanged();
            }
        }
    }

    string _TextInput;
    public string TextInput
    {
        get { return _TextInput; }
        set
        {
            if (value != _TextInput)
            {
                _TextInput = value;
                NotifyPropertyChanged();
                UpdateWordCount();
            }
        }
    }

    private void UpdateWordCount()
    {
        WordCount = TextInput.Split()
            .DefaultIfEmpty()
            .Where(s => s.Trim().Length > 0)
            .Count();
    }

    int _WordCount;
    public int WordCount
    {
        get { return _WordCount; }
        private set
        {
            if (value != _WordCount)
            {
                _WordCount = value;
                NotifyPropertyChanged();
            }
        }
    }

    public MyViewModel()
    {
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Subscribe(_ => CurrentTime = DateTime.Now.ToLongTimeString());
    }
}

By using the separation the application logic becomes testable without building a UI, as the viewmodel is not dependent on System.Windows*.

The word count and the current time field can be bound to the controls by using the standard binding mechanism:

<TextBox ... Text="{Binding CurrentTime}"></TextBox>
<TextBox ... Text="{Binding WordCount}"></TextBox>

However, if we want the word count to be updated on every text change and not after focus change, we’ll have to use the TextChanged of the third text box to update the viewmodel. The update of the word count is currently implemented in the setter for the TextInput property on line 29. Using ReactiveUI this will change as well.

The configuration of the UI by code-behind looks as follows:

ViewModels.MyViewModel VM = new ViewModels.MyViewModel();
DataContext = VM;
var textChanged = Observable.FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
    handler => handler.Invoke,
    h => textBox3.TextChanged += h,
    h => textBox3.TextChanged -= h);
textChanged.Subscribe(_ => VM.TextInput = textBox3.Text);

Compared to the first WinForms version it has two responsibilities less – the current time update logic and the word count logic, which is now in the viewmodel.

As the next step, the implementation of the viewmodel should be further simplified.

Source code can be found here.