Tag Archives: c#

Batching Data by Time or Count Using Reactive Extensions (Rx)


The time-series database InfluxDB provides a HTTP API to write data. Data points (measurements) are inserted via a Line protocol, which allows batching of data by writing multiple points in one HTTP request.

While experimenting for a simple InfluxDB C++ client, I wanted to create an asynchronous fire-and-forget API, so that the data points can be sent over HTTP without blocking the instrumented C++ code. Several “readymade” options to implement concurrency in this scenario are available.

A simple PAIR of ZeroMQ sockets would do the job, but I’d have to implement batching separately. Thus, I turned my attention to a higher-level abstraction: Rx

Rx Window Operator

Quickly looking through the cross-language Reactive Extensions site, I found the right operator: Window.

This operator has luckily been implemented in RxCpp, thus I proceeded with the experiment.

Batching Design

Batching using Rx Winodw Operator

Rx Window Operator (CC BY 3.0 reactivex.io)[1. Source: reactivex.io License: (CC BY 3.0)]

The window operator takes an observable sequence of data and splits it into windows (batches) of observables. To batch requests, the observable windows of data are aggregated to a single value upon the last value from the windows (via other aggregating Rx operators).

A Toy Problem

To validate the approach, the following problem is set:

Given a stream of integers, append the integers into a series of strings, either every second, or every N integers

String appends with integer-to-string conversions in C++ will be done via the {fmt} library.

Batching in One Line of Code

A stream of numbers batched either by time or count:

auto values = rxcpp::observable<>::range(1, 1000'000)
    .window_with_time_or_count(std::chrono::seconds(1), 100'000);

Note, there is an almost one-to-one translation into a C# version:

var values = Observable.Range(1, 1000000)
               .Window(TimeSpan.FromSeconds(1), 100000);

This indicates the power of the Rx abstraction across languages. The Rx website provides just the right sorting of the documentation to be able to translate Rx code from one language to another.

Aggregating the Batches

In order to do something useful with the batched data, the Scan operator is used to gather the data in a string buffer, and after the last value has been received, the string buffer is assembled into a string and processed:

    [](rxcpp::observable<int> window) {
        // append the number to the buffer
            [](std::shared_ptr<fmt::MemoryWriter> const& w, int v)
            *w << v;
            return w;

        // what if the window is empty? Provide at least one empty value

        // take the last value

        // print something fancy
        .subscribe([](std::shared_ptr<fmt::MemoryWriter> const& w) {
                "Len: {} ({}...)\n",
                w->str().substr(0, 42)

The Tale of Two Bugs

In the initial (non-TDD) spike, the batching seemed to work, however, something caught my attention (the code bites back):

[window 0] Create window
Count in window: 170306
Len: 910731 (123456789101112131415161718192021222324252...)

the window wasn’t capped at 100’000. This could have been either a misunderstanding or a bug, thus I formulated a hesitant issue #277. As it turned out, it indeed was a bug, which was then fixed in no time. However, the first bug has hidden another one: the spike implementation started to crash at the end: when all the windows were capped by count, and not by time, last window was empty, as all values fit exactly into 10 batches.

The Last operator rightly caused an exception due to an empty sequence. Obviously, there’s no last value in an empty sequence. Rubber Ducking and a hint from Kirk Shoop fixed the issue by utilizing the StartWith operator to guarantee, the sequence is never empty. An empty string buffer can be ignored easier downstream.

Active Object

The active object pattern was applied to implement a fire-and-forget asynchronous API. A Rx Subject to bridge between the function call and the “control-inverted” observable:

struct async_api {
    rxcpp::subjects::subject<line> subj;

        auto incoming_requests = subj
            .map([](auto line) {
                return line.to_string();

                // schedule window caps on a new thread

    // fire-and-forget
    void insert(line const& line)

in order not to block the caller (which would be the default behavior), the observable watches the values from each window on a new thread. Here, scheduling on a thread pool (currently missing in RxCpp) would probably be beneficial.

While this implementation might not be an optimal one, the declarative nature of Rx, once the basics are understood, allows to “make it work and make it right” pretty quickly by composing the right operators.


The runnable code of the example can be found at Github: C++ version.

In order to show, how similar the high level code can be between different languages when writing, I’ve “ported” the example to C# [2. The C# version appears to run faster on my windows machine while solving the same toy problem].

Presenting at TU-Munich: testing on c++ projects, Thursday, March 26, 2015 7:00 PM

Expecting Thank you to all for a superb heated debate! next week

“no excuses for not testing on c++ projects”

Thursday, March 26, 2015
7:00 PM

details: http://www.meetup.com/MUCplusplus/events/220628575/

If only all test were comprehensible…

SCENARIO("acquiring wisdom") {

  GIVEN("an oracle") { 
    oracle gus;
    WHEN("I ask it to speak") {
      auto answer = gus.speak();

      THEN("wisdom is apparent") {
        CHECK( answer != "bla" );
[1. Catch]

→ The code can be found @github, including the presentation slides.

ris – a lightweight cross-platform resource compiler for c++ projects

Why a resource compiler in the 21st century?

Starting a c++ project that will potentially need static string resources (i.e. Lua scripts) makes one search for an easy way to embed large strings in an executable. There are many ways of including static resources, but there seems to be no simple but robust, platform-independent way [1. http://stackoverflow.com/questions/5479691/is-there-any-standard-way-of-embedding-resources-into-linux-executable-image]. For fast prototyping and one-shot projects, I’d like to have lightweight, minimum-configuration and install solution to the problem of embedding binary resources without having to use a large framework.

Premake, my favourite light-weight meta-build system contains a number of Lua scripts in its binary. These are embedded using a Lua script into a simple array of C-string constants. This is the simplicity that in my view should be strived for. ris is an attempt to do something similar for general c++ projects with a possibility of embedding binary blobs.

ris – cross-platform resource compiler for c++

The project (ris@github) is in its infancy, but seems already to be usable. Here’s a preview:

Defining and compiling resources

ris <path_to>/<resources>.json

with an input file as this self-explaining one:

    "namespace" : "test",
    "header" : "acceptance_test/resource.h",
    "source" : "acceptance_test/resource.cpp",
    "resources" : [
            "name" : "string_test",
            "source_type" : "string",
            "source" : "plain text"
            "name" : "binary_file_test",
            "source_type" : "file",
            "source" : "test.bin"

will generate two c++11 files to include in your project, enabling easy resource retrieval:

std::string res = test::Resource::string_test();


std::string res = test::Resource::Get("string_test");

Update 30.07.2015: now resources can be defined more concisely in YAML. A minimal resource definition in YAML looks like the following:

header: "res.h"
source: "res.cpp"

    compression: "LZ4F"
    name: "some_text"
    source: "some text"
    source_type: "string"

Enumerating the resources

Resource keys in the compiled resource can be enumerated passing a callable to GetKeys:

std::vector<std::string> keys;
test::Resource::GetKeys([&keys](char const* key){


Using an optional compression library bundle, adding a compression property to a resource description enables transparent (de)compression:

"compression" : "LZ4HC"


updated 24.11.2014:
ris now uses text resources generated and bootstrapped by its own early version. The goal is to make The code generator is customizable. The default template can be seen in template.json, and the generated header in template.h. The generation sequence can be seen in ris.cpp.

Using the second parameter to ris, it’s possible to override strings in the generator template. See an example below.


updated 27.11.2014:
One such customization is available in template_cpp03.json, where the C++11 constructs are replaced with generateable C++03 constructs.

To generate the resources using the C++03 template:

ris my_template.json template_cpp03.json

Why C++?

Such code generator as ris could most probably be developed more rapidly using any other programming language with a huge framework and a ton of libraries behind them. My personal preference for certain kinds of small projects lies in the area of self-contained single-binary/single-file executables or libraries, such as Lua. Lua is the primary motivation for this project, as it is itself a compact library for building flexible and extensible prototypes and tools. ris can act as a bootstrapping component to embed resources for building specialized shell-scripting replacements, i.e. for massive scripted file operations.



There is a number of paths this project can take from here. Features, such as robustness, performance or flexibility could all be addressed, but most probably ris will be shaped by its usage or absence of such.