Tag Archives: cpp

Deterministic Testing of Concurrent Behavior in RxCpp

A Retrospective

After getting inspired by The Reactive Manifesto, it is hard not to get excited about Reactive Extensions. Such excitement has lead to a series of hello-world articles and some code examples. While Reactive Extensions take over the programming world in C#, Java and JavaScript, it seems, the world of C++ is slow to adopt RxCpp.

The new ReactiveX Tutorial link list is a great place to start learning and grokking. This article is an attempt to bring RxCpp closer to C++ developers who might not see yet, how a reactive programming model might help writing better, more robust code.

Testing concurrency with RxCpp

A previous article showed how to test ViewModels in C# by parameterizing the ViewModels with a scheduler. In a UI setting, the scheduler usually involves some kind of synchronization with the GUI thread. Testing keystrokes arriving at certain speed would require some effort to simulate events, probably leading to brittle tests. With the scheduler abstraction, the concurrent behavior of a component is decoupled from physical time, and thus can be tested repeatedly and very fast. This was the C# test:

(new TestScheduler()).With(scheduler =>
{
    var ticker = new BackgroundTicker(scheduler);

    int count = 0;
    ticker.Ticker.Subscribe(_ => count++);
    count.Should().Be(0);

    // full control of the time without waiting for 1 second
    scheduler.AdvanceByMs(1000);
    count.Should().Be(1);
});

Show Me The Code

Without further ado, the C++ version is not very far from the C# version. In a simple test, we can parameterize a sequence of integer values arriving at specified intervals (a ticker) with a coordination (why coordination and not scheduler, read in the RxCpp developer manual:

auto seq = rxcpp::observable<>::interval(
            std::chrono::milliseconds(1),
            some_scheduler
);

The deterministic test scheduler API is currently available through a worker created on the test scheduler:

auto sc = rxcpp::schedulers::make_test();
auto worker = sc.create_worker();
auto test = rxcpp::identity_same_worker(worker);

The rest should read like English:

int count = 0;

WHEN("one subscribes to an observable sequence on the scheduler") {
  auto seq = rxcpp::observable<>::interval(
              std::chrono::milliseconds(1),
              test // on the test scheduler
             ).filter([](int i) { return i % 2; });

  seq.subscribe([&count](int){
    count++;
  });

  THEN("the sequence is not run at first") {
    worker.sleep(2 /* ms */);

    CHECK(count == 0);

    AND_WHEN("the test scheduler is advanced manually") {

      THEN("the sequence is run as expected") {
        worker.advance_by(8 /* ms */);
        CHECK(count == 5);
      }
    }
  }
}

The full test can be seen @github, and is built on Travis CI

RxCpp 2

RxCpp 2 and API

The last article on rxcpp was based on a now obsolete version of RxCpp. The key contributor to the library, Kirk Shoop, has kindly provided a rewrite based on the newer, 2.0 API of the library: see the pull request, upon which this article is based.

Since the first article, the project has been enriched with somewhat more readable GIVEN/WHEN/THEN-style tests using Catch 1.

Still Ticking: Scheduler and Coordination in RxCpp 2

The previous articles give examples of managing periodic events, such as ticker ticks and measurements in c++. The following example creates an event loop that will be used for coordinated output of various events to the console:

auto scheduler = rxcpp::schedulers::make_same_worker(
    rxcpp::schedulers::make_event_loop().create_worker()
);

auto coordination = rxcpp::identity_one_worker(scheduler);

One such sequence of events is some kind of measurement 2

auto measure = rxcpp::observable<>::interval(
        // when to start
        scheduler.now() + std::chrono::milliseconds(250),
        // measurement frequency
        std::chrono::milliseconds(250),
        coordination)
    // take Hz values instead of a counter
    .map([&FM](int) { return FM.Hz(); });

auto measure_subscription = measure
    .subscribe([](int val) {
        std::cout << val << std::endl;
    });

Why didn’t it tick?

If this code were the end of the main program, there wouldn’t be any observable ticks, as all the objects would be destroyed before the first scheduled event. To see the code in action, we shall wait for some condition that will change when we’re done. This step is not necessary if there’s a GUI toolkit event loop that keeps objects alive, but it has to be simulated for a console example.

To demonstrate the subscription change and wait for some time, we’ll wait twice for an atomic variable to become zero:

std::atomic<long> pending(2);

...

// after all subscriptions defined
while (pending) {
    sleep(1000); // wait for ticker and measure to finish
}

Tick and Stop

The other ticker will have another period, will only tick 10 times, and then decrement the pending counter:

auto ticker = rxcpp::observable<>::interval(
    scheduler.now() + std::chrono::milliseconds(500),
    std::chrono::milliseconds(500),
    coordination);

ticker
    .take(10)
    .subscribe([](int val) {
        std::cout << "tick " << val << std::endl;
    },[&](){
        --pending; // take completed the ticker
    });

Now, we can schedule the termination of the measurement (decrement pending) subscription halfway through the 10-tick run. This scheduling is done on the same scheduler that is running all the subscriptions:

scheduler.create_worker().schedule(scheduler.now() + std::chrono::seconds(2), 
    [&](const rxcpp::schedulers::schedulable&) {
        std::cout << "Canceling measurement ..." << std::endl;
        measure_subscription.unsubscribe(); // cancel measurement
        --pending; // signal measurement canceled
    });

The result:

63
tick 1
63
61
tick 2
63
61
tick 3
63
62
Canceling measurement ...
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
tick 10

Thanks, Kirk & other library contributors!

Code @ github

Next: deterministic testing of concurrent behavior

  1. i.e. create.cpp
  2. Observe the convergence of the API towards the C# version.

A C++ Background Ticker, now with Rx.cpp

Finally, Rx.cpp

Some time ago I have written that I didn’t have enough patience to recreate the background ticker example in C++ using Rx.cpp. Since then the Rx.cpp project seems to have grown out of the spike phase, and even has a native NuGet package. It has also gone multiplatform (Windows, OSX and Linux): observe the green Travis-CI Button.

Update: new blog post, discussing RxCpp v2 and testing using the test scheduler.

A simple console ticker

As in .Net, Reactive Extensions provide a simple way to process streams of data asynchronously, while keeping the concurrency-related code declarative and thus readable. Here’s a simple ticker in the console which runs asynchronously to the main thread:

auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>();
auto ticker = rxcpp::Interval(std::chrono::milliseconds(250), scheduler);

rxcpp::from(ticker)
	.where([](int val) { return val % 2 == 0; })
	.take(10)
	.subscribe([](int val) {
		std::cout << "tick " << val << std::endl;
	});

std::cout << "starting to tick" << std::endl;

resulting in something like:

starting to tick
tick 0
tick 2
tick 4
tick 6
tick 8
...

where the ticks appear once in 250 milliseconds.

Throwing away code

The PPL example was simulating polling a sensor and printing the value. It had an error-prone and buggy ad-hoc implementation of an active object, ticking at predefined intervals. This can be now happily thrown away, as Rx allows a cleaner concurrency control and testability using schedulers, and implements a timed sequence: Interval.

Preconditions

FrequencyMeter FM;
auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>();

The scheduler will be used for all subscriptions.

The tickers

The first one:

auto measure = rxcpp::Interval(std::chrono::milliseconds(250),scheduler);
auto measure_subscription = rxcpp::from(measure)
	.subscribe([&FM](int val) {
		std::cout << FM.Hz() << std::endl;
	});

where measure_subscription is a rxcpp::Disposable for subscription lifetime control.

And the other one:

auto ticker = rxcpp::Interval(std::chrono::milliseconds(500), scheduler);
rxcpp::from(ticker)
	.take(10)
	.subscribe([](int val) {
		std::cout << "tick " << val << std::endl;
	});

where you can observe the LINQ-style filter take

Managing subscriptions

In the PPL example, one could start and stop the ticker. However, in Rx.cpp this can be simply modeled by disposable subscriptions. Hence, after some kind of sleeping, the measurement can be stopped:

sleep(2000);
std::cout << "Canceling measurement ..." << std::endl;
measure_subscription.Dispose(); // cancel measurement

Resulting in similar output:

60
63
tick 0
61
62
tick 1
63
63
tick 2
62
60
tick 3
Canceling measurement ...
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9

Restarting measurement can be done by creating a new subscription.

Why not simply signals/slots?

Almost quoting the Intro to Rx book, the advantages of using Rx over (at least) simple implementations of signal/slot mechanism are:

  • Better maintainability due to readable, composable, declarative code
  • Scheduler abstraction allowing for fast, deterministic, clock-independent tests of concurrency concerns
  • Declarative concurrency through the same scheduler abstraction
  • LINQ-like composition and filtering of event streams
  • Easy subscription control via disposables
  • Completion and exception handling built-in in the Observer concept

Code

@GitHub

Corrections, suggestions and comments are welcome!

Update 26.6.2014: There’s been a new release of Rx.cpp on nuget, and Kirk Shoop pushed a pull request, upgrading the project and the api usage to Rx.cpp 2.0.0. There have been some changes, and there are some interesting patterns, which should be blogged about in the near future.

Quickstart for cross-platform c++ projects

A typical dilemma for a c++ developer is creating the initial build configuration. Out of my affection for Lua, I’ve collected my typical premake4 patterns into a separate project to be able to set up c++ projects on any platform within a minute.

Here’s a sample from selfdestructing:

Self-destructing objects

One day, recalling the programmed cell death I decided to create class that would self-destruct on a certain amount of copying.

Today’s little research resulted in the following project: https://github.com/d-led/selfdestructing

The question arises, what exactly copying stands for. I could think of four policies. The implementation of those policies is configurable and extensible. Right now there is no explicitly thread-safe policy implemented, but those would not be very complicated, if the purpose is debugging solely.

Usage

#include "crash_on_copy.hpp"
struct TestNumberCrash : public crashes::on<3>::copies {};

on_copiesCrashes on 3 total copies of the originally created object, but doesn’t crash on 3 total instances of the class

struct TestCopyNrCrash : public crashes::after<3>::copies {};

after_copiesCrashes on any third copy of the original object, but doesn’t crash on 3 total instances of the class

struct TestTotalNrCrash : public crashes::on_total<3,TestTotalNrCrash>::instances {};

on_total_instancesCrashes on instantiation of an object if 2 objects are alive, but doesn’t crash on any creation if the total amount of instances is below 2

struct TestAfterTotalNrCrash : public crashes::after_total<3,TestAfterTotalNrCrash>::instances {};

after_total_instancesCrashes after a third object instantiation of the class

Singular form aliases are also available, i.e. crashes::on<1>::copy.

The project at github contains the test showing the crashing pattern.

A Background Ticker, now in C++ with PPL

Unfortunately, the Reactive Extensions for C++ (Rx++) did not work for me as smoothly as their C# counterpart. My goal is to recreate the background ticker from the Rx version. The first, and perhaps not the last example implementation will be using the Parallel Patterns Library by Microsoft, which uses the Concurrency Runtime. The goal is to write a code that abstracts the threads away, and uses some background scheduler to schedule calls to functions in an asynchronous manner.

Here, asynchronous calls will be done by tickers, one of which will poll a value from a bogus frequency meter and write it to the console, whereas the other one will just write “tick”.

First, the frequency meter that gives random values around 62 (a magic number):

#include <mutex>
#include <random>
class FrequencyMeter
{
	mutable std::mt19937 gen;
	std::uniform_int_distribution<int> dis;
	mutable std::mutex m;
public:
	FrequencyMeter() :
		dis(0,3),
		gen(std::random_device()())
	{}

	int Hz() const
	{
		std::lock_guard<std::mutex> lock(m);
		return 60+dis(gen);
	}
};

This class is not the primary concern here, but if the Hz value is accessed from multiple threads, the access should perhaps be serialized (by a mutex here).

Now, one of the side wishes for the ticker is the ability to start and stop it any time. The ticker should also not care what it executes, hence we can take an std::function for now, or perhaps anything callable. Howerver, having c++11 lambdas, it’s easy to convert to standard function (thread-safety and ownership issues notwithstanding yet). Following the examples for cancellation tokens, one can write a small wrapper that will tick at predefined intervals (see full code), wrapping a simple loop:

while (running)
{
	// Check for cancellation. 
	if (concurrency::is_task_cancellation_requested())
	{
		running = false;
		concurrency::cancel_current_task();
	}
	else
	{
		this->tick_once();
		concurrency::wait(interval);						
	}
}

In the small test program, the frequency meter is instantiated and the first ticker is started, ticking once a quarter of a second:

FrequencyMeter FM;
active_ticker measure([&FM]{std::cout<<FM.Hz()<<std::endl;});
measure.start(250);

Then another ticker is started:

active_ticker ticker([]{std::cout<<"tick"<<std::endl;});
ticker.start(500);

And some wait, start and stop logic:

wait(2000); //see the output for a couple of seconds
measure.stop(); //stop the frequency meter
wait(2000); //wait some more
ticker.stop(); //stop ticking
measure.start(250); //start measuring again
wait(2000); //and wait a bit more
measure.stop();

The output looks like this:

60
tick
61
62
tick
61
62
tick
60
60
Canceling measurement ...
tick
60
tick
tick
tick
Restarting measurement ...
tick
60
63
62
61
62
62
62
Done

Any other technology to try elegant ticking (tinkering) with? TBB, zeromq in process?

Asynchronously access an object’s property repeatedly in C#

A question on Stackoverflow got me thinking of a beautiful way of reporting a value of an object in C# repeatedly, something like polling a sensor. Typically, polling is pull-based, but having been reading Intro to Rx for the second time lately, and being convinced of its push-base structural and syntactic eunoia, I’ve created a solution based on Rx listed below.

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace rxtest
{
    class FrequencyMeter
    {
        Random rand = new Random();
        public int Hz
        {
            get { return 60+rand.Next(3); }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var obs = Observable.Generate<FrequencyMeter, int>(
                new FrequencyMeter(), //state
                x => !Console.KeyAvailable, // while no key is pressed
                x => x, // no change in the state
                x => x.Hz, // how to get the value
                x => TimeSpan.FromMilliseconds(250), //how often
                Scheduler.Default)
                .DistinctUntilChanged() //show only when changed
                ;

            using (var _ = obs.Subscribe(x => Console.WriteLine(x)))
            {
                var ticks = Observable.Interval(TimeSpan.FromSeconds(0.5))
                           .Subscribe(x=>Console.WriteLine("tick")); //an example only
                Console.WriteLine("Interrupt with a keypress");
                Console.ReadKey();
            }
        }
    }
}

producing an output similar to that:

Interrupt with a keypress
62
60
62
tick
61
60
tick
62
61
tick
60
62
61
tick
62

Now, with Rx available in C++ that would be interesting what will be left of the eunoia.