Tag Archives: reactive

Reactive Extensions Example for the Browser

Introduction

This is the next post in the reactive examples series. Previous articles focused on building a MVVM-style ReactiveUI-based Windows application in C# with the help of Reactive Extensions. The example application had some simple word counting logic and a background ticker, demonstrating an implementation without using error-prone callbacks or explicit threading. This article will try to re-create the same application for the web browser using Vue.js, Bootstrap-Vue and vue-rx.

The result looks like this:

an input form, counting words and ticking (Vue-Rx reactive example by d-led)

In the Meanwhile

The Actor Model

After several attempts to implement the example with RxJavaFX, I gave up on RxJava for a UI, and focused on another approach to writing concurrent reactive software: the Actor Model. This lead me to converge on two* Actor Model languages, Pony and Elixir/Erlang, and later, on one framework: Vlingo (thanks to a serendipitous meeting and a kind invitation to an IDDD workshop by Vaughn Vernon).

The venture resulted in several presentations, including one at the Lightweight Java User Group München Meetup. In the preparation for the meetup, I have demonstrated how Reactive Extensions can enhance actor model code with time-based operators, and how the transition between the paradigms is achieved (see vlingo_experiments/batching_with_rx). 

As late Pieter Hintjens said and wrote, alluding to Conway’s Law, “read about the Actor model, and become a message-driven, zero shared state Actor”. The 1973 paper by Carl Hewitt and others on the Actor Model was published in proceedings of an artificial intelligence conference of the time. There are good indications that this concurrency model is a good fit for a computational model of the brain (see 1, 2).

All this deserves another series of blog posts.

In the Browser

The Actor Model is coming to the browser too: it is a natural fit for the modern web. See the talk: Architecting Web Apps – Lights, Camera, Action! (Chrome Dev Summit 2018) and the related Github project: PolymerLabs/actor-boilerplate. It has been seen in other places too, such as in the emerging framework Tarant.

Alas, I can’t show an actor model example in the browser, yet. Thus, back to Reactive Extensions!

How to get to vue-rx?

It seems, in the world of web front-end programming, there are numerous diverging paths, all of which, in the end, converge on downloading half the internet of little script files in various dialects of JavaScript. But don’t despair, commit often and small. I am not native to the JS world, and previous attempts to re-create the example in the browser failed miserably.

Vue.cli

The path chosen here is to start with a boilerplate generated with Vue CLI 3

vue create vue-rx-example

Dependencies

Install the dependencies via npm install

  • vue – the sensible MVVM library for the browser
  • moment – to format time
  • rxjs, rxjs-compat, vue-rx – the Rx libraries required in this context
  • bootstrap-vue – a responsive web page design pattern

The View Component

an input form, counting words and ticking

Replacing the generated view boilerplate, the following remains:

<template>
  <b-form>
    <b-form-group label="Background ticker">
      <b-form-input readonly type="text" v-model="ticker" />
    </b-form-group>

    <b-form-group label="Word count">
      <b-form-input readonly type="text" v-model="countWords" />
    </b-form-group>

    <b-form-group label="Enter some text">
      <b-form-textarea v-model="text" style="min-height: 200px" />
    </b-form-group>
  </b-form>
</template>

which is a simple form with two read-only text fields, and one input text area, all declaratively bound to the viewmodel via the v-model directive

The ViewModel & Vue Extensions

The dependencies must be registered with Vue in the <script /> tag in order for them to work together as intended (excluding some CSS/other boilerplate):

import Vue from "vue";
import VueRx from "vue-rx";
import Rx from "rxjs/Rx";
import BootstrapVue from "bootstrap-vue";
Vue.use(BootstrapVue, VueRx, Rx);
// here comes the ViewModel

The following is all of the ViewModel with the explanations in the comments:

export default {
  name: "HelloWorld",
  data() {
    // input field is bound to this
    return {
      text: ""
    };
  },

  // rx-vue part
  subscriptions: function() {
    // watch the input data as an observable stream
    const countWords = this.$watchAsObservable("text")
      // update only if not typing for 1/2 s
      .debounceTime(500)
      .pluck("newValue")
      .startWith("")
      // count the words
      .map(val => {
        const s = val.trim();
        return s == "" ? 0 : s.split(/\s+/).length;
      });

    // tick the timestamp every second
    const ticker = Observable.interval(1000 /* ms */).map(_ =>
      new moment().format("H:mm:ss A")
    );

    return { countWords, ticker };
  }
};

which a Rx.Net developer might find familiar:

this.WhenAnyValue(x => x.TextInput)
    .Where(x => !string.IsNullOrWhiteSpace(x))
    .Select(x => x
        .Split()
        .Count(word => !string.IsNullOrWhiteSpace(word)))
    .ToProperty(this, vm => vm.WordCount, out _WordCount)
;

Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.Now.ToLongTimeString());
    .ToProperty(this,
        ticker => ticker.BackgroundTicker,
        out _BackgroundTicker)
;

Conclusion

Reactive Extensions have proven to be a suitable paradigm for building reactive user interfaces, landing them on the Thoughtworks Radar into the Adopt ring. Rx implementations can be used in variety of technologies, as the Reactive Trader project has shown.

While the Actor Model shines on the server, reactive, message-driven technologies play well together, and, perhaps, soon it will be natural to structure applications as a mix of stream-based and actor-based components.

Source code: https://github.com/d-led/vue-rx-example
Demo: https://ledentsov.de/static/vue-rx-example

Deterministic Testing of Concurrent Behavior in RxCpp

A Retrospective

After getting inspired by The Reactive Manifesto, it is hard not to get excited about Reactive Extensions. Such excitement has lead to a series of hello-world articles and some code examples. While Reactive Extensions take over the programming world in C#, Java and JavaScript, it seems, the world of C++ is slow to adopt RxCpp.

The new ReactiveX Tutorial link list is a great place to start learning and grokking. This article is an attempt to bring RxCpp closer to C++ developers who might not see yet, how a reactive programming model might help writing better, more robust code.

Testing concurrency with RxCpp

A previous article showed how to test ViewModels in C# by parameterizing the ViewModels with a scheduler. In a UI setting, the scheduler usually involves some kind of synchronization with the GUI thread. Testing keystrokes arriving at certain speed would require some effort to simulate events, probably leading to brittle tests. With the scheduler abstraction, the concurrent behavior of a component is decoupled from physical time, and thus can be tested repeatedly and very fast. This was the C# test:

(new TestScheduler()).With(scheduler =>
{
    var ticker = new BackgroundTicker(scheduler);

    int count = 0;
    ticker.Ticker.Subscribe(_ => count++);
    count.Should().Be(0);

    // full control of the time without waiting for 1 second
    scheduler.AdvanceByMs(1000);
    count.Should().Be(1);
});

Show Me The Code

Without further ado, the C++ version is not very far from the C# version. In a simple test, we can parameterize a sequence of integer values arriving at specified intervals (a ticker) with a coordination (why coordination and not scheduler, read in the RxCpp developer manual:

auto seq = rxcpp::observable<>::interval(
            std::chrono::milliseconds(1),
            some_scheduler
);

The deterministic test scheduler API is currently available through a worker created on the test scheduler:

auto sc = rxcpp::schedulers::make_test();
auto worker = sc.create_worker();
auto test = rxcpp::identity_same_worker(worker);

The rest should read like English:

int count = 0;

WHEN("one subscribes to an observable sequence on the scheduler") {
  auto seq = rxcpp::observable<>::interval(
              std::chrono::milliseconds(1),
              test // on the test scheduler
             ).filter([](int i) { return i % 2; });

  seq.subscribe([&count](int){
    count++;
  });

  THEN("the sequence is not run at first") {
    worker.sleep(2 /* ms */);

    CHECK(count == 0);

    AND_WHEN("the test scheduler is advanced manually") {

      THEN("the sequence is run as expected") {
        worker.advance_by(8 /* ms */);
        CHECK(count == 5);
      }
    }
  }
}

The full test can be seen @github, and is built on Travis CI

RxCpp 2

RxCpp 2 and API

The last article on rxcpp was based on a now obsolete version of RxCpp. The key contributor to the library, Kirk Shoop, has kindly provided a rewrite based on the newer, 2.0 API of the library: see the pull request, upon which this article is based.

Since the first article, the project has been enriched with somewhat more readable GIVEN/WHEN/THEN-style tests using Catch [1. i.e. create.cpp].

Still Ticking: Scheduler and Coordination in RxCpp 2

The previous articles give examples of managing periodic events, such as ticker ticks and measurements in c++. The following example creates an event loop that will be used for coordinated output of various events to the console:

auto scheduler = rxcpp::schedulers::make_same_worker(
    rxcpp::schedulers::make_event_loop().create_worker()
);

auto coordination = rxcpp::identity_one_worker(scheduler);

One such sequence of events is some kind of measurement [2. Observe the convergence of the API towards the C# version.]

auto measure = rxcpp::observable<>::interval(
        // when to start
        scheduler.now() + std::chrono::milliseconds(250),
        // measurement frequency
        std::chrono::milliseconds(250),
        coordination)
    // take Hz values instead of a counter
    .map([&FM](int) { return FM.Hz(); });

auto measure_subscription = measure
    .subscribe([](int val) {
        std::cout << val << std::endl;
    });

Why didn’t it tick?

If this code were the end of the main program, there wouldn’t be any observable ticks, as all the objects would be destroyed before the first scheduled event. To see the code in action, we shall wait for some condition that will change when we’re done. This step is not necessary if there’s a GUI toolkit event loop that keeps objects alive, but it has to be simulated for a console example.

To demonstrate the subscription change and wait for some time, we’ll wait twice for an atomic variable to become zero:

std::atomic<long> pending(2);

...

// after all subscriptions defined
while (pending) {
    sleep(1000); // wait for ticker and measure to finish
}

Tick and Stop

The other ticker will have another period, will only tick 10 times, and then decrement the pending counter:

auto ticker = rxcpp::observable<>::interval(
    scheduler.now() + std::chrono::milliseconds(500),
    std::chrono::milliseconds(500),
    coordination);

ticker
    .take(10)
    .subscribe([](int val) {
        std::cout << "tick " << val << std::endl;
    },[&](){
        --pending; // take completed the ticker
    });

Now, we can schedule the termination of the measurement (decrement pending) subscription halfway through the 10-tick run. This scheduling is done on the same scheduler that is running all the subscriptions:

scheduler.create_worker().schedule(scheduler.now() + std::chrono::seconds(2), 
    [&](const rxcpp::schedulers::schedulable&) {
        std::cout << "Canceling measurement ..." << std::endl;
        measure_subscription.unsubscribe(); // cancel measurement
        --pending; // signal measurement canceled
    });

The result:

63
tick 1
63
61
tick 2
63
61
tick 3
63
62
Canceling measurement ...
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
tick 10

Thanks, Kirk & other library contributors!

Code @ github

Next: deterministic testing of concurrent behavior