Tag Archives: reactive extensions

Reactive Extensions Example for the Browser

Introduction

This is the next post in the reactive examples series. Previous articles focused on building a MVVM-style ReactiveUI-based Windows application in C# with the help of Reactive Extensions. The example application had some simple word counting logic and a background ticker, demonstrating an implementation without using error-prone callbacks or explicit threading. This article will try to re-create the same application for the web browser using Vue.js, Bootstrap-Vue and vue-rx.

The result looks like this:

an input form, counting words and ticking (Vue-Rx reactive example by d-led)

In the Meanwhile

The Actor Model

After several attempts to implement the example with RxJavaFX, I gave up on RxJava for a UI, and focused on another approach to writing concurrent reactive software: the Actor Model. This lead me to converge on two* Actor Model languages, Pony and Elixir/Erlang, and later, on one framework: Vlingo (thanks to a serendipitous meeting and a kind invitation to an IDDD workshop by Vaughn Vernon).

The venture resulted in several presentations, including one at the Lightweight Java User Group München Meetup. In the preparation for the meetup, I have demonstrated how Reactive Extensions can enhance actor model code with time-based operators, and how the transition between the paradigms is achieved (see vlingo_experiments/batching_with_rx). 

As late Pieter Hintjens said and wrote, alluding to Conway’s Law, “read about the Actor model, and become a message-driven, zero shared state Actor”. The 1973 paper by Carl Hewitt and others on the Actor Model was published in proceedings of an artificial intelligence conference of the time. There are good indications that this concurrency model is a good fit for a computational model of the brain (see 1, 2).

All this deserves another series of blog posts.

In the Browser

The Actor Model is coming to the browser too: it is a natural fit for the modern web. See the talk: Architecting Web Apps – Lights, Camera, Action! (Chrome Dev Summit 2018) and the related Github project: PolymerLabs/actor-boilerplate. It has been seen in other places too, such as in the emerging framework Tarant.

Alas, I can’t show an actor model example in the browser, yet. Thus, back to Reactive Extensions!

How to get to vue-rx?

It seems, in the world of web front-end programming, there are numerous diverging paths, all of which, in the end, converge on downloading half the internet of little script files in various dialects of JavaScript. But don’t despair, commit often and small. I am not native to the JS world, and previous attempts to re-create the example in the browser failed miserably.

Vue.cli

The path chosen here is to start with a boilerplate generated with Vue CLI 3

vue create vue-rx-example

Dependencies

Install the dependencies via npm install

  • vue – the sensible MVVM library for the browser
  • moment – to format time
  • rxjs, rxjs-compat, vue-rx – the Rx libraries required in this context
  • bootstrap-vue – a responsive web page design pattern

The View Component

an input form, counting words and ticking

Replacing the generated view boilerplate, the following remains:

<template>
  <b-form>
    <b-form-group label="Background ticker">
      <b-form-input readonly type="text" v-model="ticker" />
    </b-form-group>

    <b-form-group label="Word count">
      <b-form-input readonly type="text" v-model="countWords" />
    </b-form-group>

    <b-form-group label="Enter some text">
      <b-form-textarea v-model="text" style="min-height: 200px" />
    </b-form-group>
  </b-form>
</template>

which is a simple form with two read-only text fields, and one input text area, all declaratively bound to the viewmodel via the v-model directive

The ViewModel & Vue Extensions

The dependencies must be registered with Vue in the <script /> tag in order for them to work together as intended (excluding some CSS/other boilerplate):

import Vue from "vue";
import VueRx from "vue-rx";
import Rx from "rxjs/Rx";
import BootstrapVue from "bootstrap-vue";
Vue.use(BootstrapVue, VueRx, Rx);
// here comes the ViewModel

The following is all of the ViewModel with the explanations in the comments:

export default {
  name: "HelloWorld",
  data() {
    // input field is bound to this
    return {
      text: ""
    };
  },

  // rx-vue part
  subscriptions: function() {
    // watch the input data as an observable stream
    const countWords = this.$watchAsObservable("text")
      // update only if not typing for 1/2 s
      .debounceTime(500)
      .pluck("newValue")
      .startWith("")
      // count the words
      .map(val => {
        const s = val.trim();
        return s == "" ? 0 : s.split(/\s+/).length;
      });

    // tick the timestamp every second
    const ticker = Observable.interval(1000 /* ms */).map(_ =>
      new moment().format("H:mm:ss A")
    );

    return { countWords, ticker };
  }
};

which a Rx.Net developer might find familiar:

this.WhenAnyValue(x => x.TextInput)
    .Where(x => !string.IsNullOrWhiteSpace(x))
    .Select(x => x
        .Split()
        .Count(word => !string.IsNullOrWhiteSpace(word)))
    .ToProperty(this, vm => vm.WordCount, out _WordCount)
;

Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.Now.ToLongTimeString());
    .ToProperty(this,
        ticker => ticker.BackgroundTicker,
        out _BackgroundTicker)
;

Conclusion

Reactive Extensions have proven to be a suitable paradigm for building reactive user interfaces, landing them on the Thoughtworks Radar into the Adopt ring. Rx implementations can be used in variety of technologies, as the Reactive Trader project has shown.

While the Actor Model shines on the server, reactive, message-driven technologies play well together, and, perhaps, soon it will be natural to structure applications as a mix of stream-based and actor-based components.

Source code: https://github.com/d-led/vue-rx-example
Demo: https://ledentsov.de/static/vue-rx-example

Batching Data by Time or Count Using Reactive Extensions (Rx)

Motivation

The time-series database InfluxDB provides a HTTP API to write data. Data points (measurements) are inserted via a Line protocol, which allows batching of data by writing multiple points in one HTTP request.

While experimenting for a simple InfluxDB C++ client, I wanted to create an asynchronous fire-and-forget API, so that the data points can be sent over HTTP without blocking the instrumented C++ code. Several “readymade” options to implement concurrency in this scenario are available.

A simple PAIR of ZeroMQ sockets would do the job, but I’d have to implement batching separately. Thus, I turned my attention to a higher-level abstraction: Rx

Rx Window Operator

Quickly looking through the cross-language Reactive Extensions site, I found the right operator: Window.

This operator has luckily been implemented in RxCpp, thus I proceeded with the experiment.

Batching Design

Batching using Rx Winodw Operator

Rx Window Operator (CC BY 3.0 reactivex.io)[1. Source: reactivex.io License: (CC BY 3.0)]

The window operator takes an observable sequence of data and splits it into windows (batches) of observables. To batch requests, the observable windows of data are aggregated to a single value upon the last value from the windows (via other aggregating Rx operators).

A Toy Problem

To validate the approach, the following problem is set:

Given a stream of integers, append the integers into a series of strings, either every second, or every N integers

String appends with integer-to-string conversions in C++ will be done via the {fmt} library.

Batching in One Line of Code

A stream of numbers batched either by time or count:

auto values = rxcpp::observable<>::range(1, 1000'000)
    .window_with_time_or_count(std::chrono::seconds(1), 100'000);

Note, there is an almost one-to-one translation into a C# version:

var values = Observable.Range(1, 1000000)
               .Window(TimeSpan.FromSeconds(1), 100000);

This indicates the power of the Rx abstraction across languages. The Rx website provides just the right sorting of the documentation to be able to translate Rx code from one language to another.

Aggregating the Batches

In order to do something useful with the batched data, the Scan operator is used to gather the data in a string buffer, and after the last value has been received, the string buffer is assembled into a string and processed:

values.subscribe(
    [](rxcpp::observable<int> window) {
        // append the number to the buffer
        window.scan(
            std::make_shared<fmt::MemoryWriter>(),
            [](std::shared_ptr<fmt::MemoryWriter> const& w, int v)
        {
            *w << v;
            return w;
        })

        // what if the window is empty? Provide at least one empty value
        .start_with(std::make_shared<fmt::MemoryWriter>())

        // take the last value
        .last()

        // print something fancy
        .subscribe([](std::shared_ptr<fmt::MemoryWriter> const& w) {
            fmt::print(
                "Len: {} ({}...)\n",
                w->size(),
                w->str().substr(0, 42)
            );
        });            
    }
);

The Tale of Two Bugs

In the initial (non-TDD) spike, the batching seemed to work, however, something caught my attention (the code bites back):

[window 0] Create window
Count in window: 170306
Len: 910731 (123456789101112131415161718192021222324252...)

the window wasn’t capped at 100’000. This could have been either a misunderstanding or a bug, thus I formulated a hesitant issue #277. As it turned out, it indeed was a bug, which was then fixed in no time. However, the first bug has hidden another one: the spike implementation started to crash at the end: when all the windows were capped by count, and not by time, last window was empty, as all values fit exactly into 10 batches.

The Last operator rightly caused an exception due to an empty sequence. Obviously, there’s no last value in an empty sequence. Rubber Ducking and a hint from Kirk Shoop fixed the issue by utilizing the StartWith operator to guarantee, the sequence is never empty. An empty string buffer can be ignored easier downstream.

Active Object

The active object pattern was applied to implement a fire-and-forget asynchronous API. A Rx Subject to bridge between the function call and the “control-inverted” observable:

struct async_api {
    //...
    rxcpp::subjects::subject<line> subj;
    //...

    async_api(...)
    {
        auto incoming_requests = subj
            .get_observable()
            .map([](auto line) {
                return line.to_string();
            });

        incoming_requests
            .window_with_time_or_count(
                window_max_ms,
                window_max_lines,
                // schedule window caps on a new thread
                rxcpp::synchronize_new_thread()
            )
            .subscribe(...)
        ;
    }

    // fire-and-forget
    void insert(line const& line)
    {
        subj
            .get_subscriber()
            .on_next(line);
    }
};

in order not to block the caller (which would be the default behavior), the observable watches the values from each window on a new thread. Here, scheduling on a thread pool (currently missing in RxCpp) would probably be beneficial.

While this implementation might not be an optimal one, the declarative nature of Rx, once the basics are understood, allows to “make it work and make it right” pretty quickly by composing the right operators.

Code

The runnable code of the example can be found at Github: C++ version.

In order to show, how similar the high level code can be between different languages when writing, I’ve “ported” the example to C# [2. The C# version appears to run faster on my windows machine while solving the same toy problem].

Why wait forever for the tests? Fast tests of slow software.

Time is volatile

Imagine writing a cron-like functionality that should produce some side-effect, such as cleanup. The intervals between such actions might be quite long. How does one test that? One can surely reason about the software, but given a certain complexity, test should be written, proving that certain important scenarios work as intended.

It’s common that software depends on time flow as dictated by the physical time flow, reflected via some clock provider. However, resetting the time to a year ahead won’t make the CPU work faster and make all the computations it should have performed within that year. A clock is also a volatile component that can be manipulated, thus if time is an issue, it’s probably a good idea not to depend on it directly, following the Stable Dependencies Principle and the Dependency Inversion Principle.

Luckily, there is an abstraction for time, at least in Reactive Extensions (Rx), which is the Scheduler.

Slow non-tests

Here’s a slow Groovy non-test, waiting for some output on the console using RxGroovy:

import rx.*
import java.util.concurrent.TimeUnit

def observable = Observable
	.just(1)
	.delay(5, TimeUnit.SECONDS)

observable.subscribe { println 'ah, OK, done! Or not?' }

Observable
	.interval(1,TimeUnit.SECONDS)
	.subscribe { println 'still waiting...' }

println 'starting to wait for the test to complete ...'

observable.toBlocking().last()

Running it produces the following slow-ticking output:

oldnontest [1. Caputured with the wonderful pragmatic tool LICEcap by the Reaper developers]

Interpreting such tests without color can be somewhat challenging [2. Here, the ‘still waiting’ subscription is terminated after the first subscription ends. Try exchanging the order of the subscribe calls.].

Fast tests

Now let’s test something ridiculous, such as waiting for a hundred days using Spock. Luckily, RxJava & RxGroovy also do implement the test scheduler, thus enabling fast tests using virtual time:

import spock.lang.Specification

import rx.Observable
import rx.schedulers.TestScheduler
import java.util.concurrent.TimeUnit


class DontWaitForever extends Specification {
    def "why wait?"() {
        setup:
            def scheduler = new TestScheduler()

            // system under test: will tick once after a hundred days
            def observable = Observable
                .just(1)
                .delay(100, TimeUnit.DAYS, scheduler)
            def done = false

        when:
            observable.subscribe {
                done = true
            }

            // still in the initial state
            done == false

        and:
            scheduler.advanceTimeBy 100, TimeUnit.DAYS

        then:
            done == true
    }
}

fasttest [3. Building using Gradle]

just checking, advancing the time by 99 days results in a failure:

just_checking

Delightful, groovy colors!

Source

github.com/d-led/dont_wait_forever_for_the_tests