Contents
Finally, Rx.cpp
Some time ago I have written that I didn’t have enough patience to recreate the background ticker example in C++ using Rx.cpp. Since then the Rx.cpp project seems to have grown out of the spike phase, and even has a native NuGet package. It has also gone multiplatform (Windows, OSX and Linux): observe the green Travis-CI Button.
Update: new blog post, discussing RxCpp v2 and testing using the test scheduler.
A simple console ticker
As in .Net, Reactive Extensions provide a simple way to process streams of data asynchronously, while keeping the concurrency-related code declarative and thus readable. Here’s a simple ticker in the console which runs asynchronously to the main thread:
auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>(); auto ticker = rxcpp::Interval(std::chrono::milliseconds(250), scheduler); rxcpp::from(ticker) .where([](int val) { return val % 2 == 0; }) .take(10) .subscribe([](int val) { std::cout << "tick " << val << std::endl; }); std::cout << "starting to tick" << std::endl;
resulting in something like:
starting to tick tick 0 tick 2 tick 4 tick 6 tick 8 ...
where the ticks appear once in 250 milliseconds.
Throwing away code
The PPL example was simulating polling a sensor and printing the value. It had an error-prone and buggy ad-hoc implementation of an active object, ticking at predefined intervals. This can be now happily thrown away, as Rx allows a cleaner concurrency control and testability using schedulers, and implements a timed sequence: Interval.
Preconditions
FrequencyMeter FM; auto scheduler = std::make_shared<rxcpp::EventLoopScheduler>();
The scheduler will be used for all subscriptions.
The tickers
The first one:
auto measure = rxcpp::Interval(std::chrono::milliseconds(250),scheduler); auto measure_subscription = rxcpp::from(measure) .subscribe([&FM](int val) { std::cout << FM.Hz() << std::endl; });
where measure_subscription
is a rxcpp::Disposable
for subscription lifetime control.
And the other one:
auto ticker = rxcpp::Interval(std::chrono::milliseconds(500), scheduler); rxcpp::from(ticker) .take(10) .subscribe([](int val) { std::cout << "tick " << val << std::endl; });
where you can observe the LINQ-style filter take
Managing subscriptions
In the PPL example, one could start and stop the ticker. However, in Rx.cpp this can be simply modeled by disposable subscriptions. Hence, after some kind of sleeping, the measurement can be stopped:
sleep(2000); std::cout << "Canceling measurement ..." << std::endl; measure_subscription.Dispose(); // cancel measurement
Resulting in similar output:
60 63 tick 0 61 62 tick 1 63 63 tick 2 62 60 tick 3 Canceling measurement ... tick 4 tick 5 tick 6 tick 7 tick 8 tick 9
Restarting measurement can be done by creating a new subscription.
Why not simply signals/slots?
Almost quoting the Intro to Rx book, the advantages of using Rx over (at least) simple implementations of signal/slot mechanism are:
- Better maintainability due to readable, composable, declarative code
- Scheduler abstraction allowing for fast, deterministic, clock-independent tests of concurrency concerns
- Declarative concurrency through the same scheduler abstraction
- LINQ-like composition and filtering of event streams
- Easy subscription control via disposables
- Completion and exception handling built-in in the Observer concept
Code
Corrections, suggestions and comments are welcome!
Update 26.6.2014: There’s been a new release of Rx.cpp on nuget, and Kirk Shoop pushed a pull request, upgrading the project and the api usage to Rx.cpp 2.0.0. There have been some changes, and there are some interesting patterns, which should be blogged about in the near future.
Pingback: RxCpp 2: Background Ticker and Abstractions