#define RXCPP_SUBJECT_TEST_ASYNC 1
#include "../test.h"
#include <rxcpp/operators/rx-finally.hpp>
#include <future>
const int static_onnextcalls = 10000000;
static int aliased = 0;
SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("locking mutex 100 million times"){
using namespace std::chrono;
typedef steady_clock clock;
int c = 0;
int n = 1;
auto start = clock::now();
std::mutex m;
for (int i = 0; i < onnextcalls; i++) {
std::unique_lock<std::mutex> guard(m);
++c;
}
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
namespace syncwithvoid {
template<class T, class OnNext>
class sync_subscriber
{
public:
OnNext onnext;
bool issubscribed;
explicit sync_subscriber(OnNext on)
: onnext(on)
, issubscribed(true)
{
}
bool is_subscribed() {return issubscribed;}
void unsubscribe() {issubscribed = false;}
void on_next(T v) {
onnext(v);
}
};
}
SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("calling on_next 100 million times"){
using namespace std::chrono;
typedef steady_clock clock;
auto c = std::addressof(aliased);
*c = 0;
int n = 1;
auto start = clock::now();
auto onnext = [c](int){++*c;};
syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext);
for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
scbr.on_next(i);
}
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
namespace asyncwithready {
// ready is an immutable class.
class ready
{
public:
typedef std::function<void()> onthen_type;
private:
std::function<void(onthen_type)> setthen;
public:
ready() {}
ready(std::function<void(onthen_type)> st) : setthen(st) {}
bool is_ready() {return !setthen;}
void then(onthen_type ot) {
if (is_ready()) {
abort();
}
setthen(ot);
}
};
template<class T, class OnNext>
class async_subscriber
{
public:
OnNext onnext;
bool issubscribed;
int count;
explicit async_subscriber(OnNext on)
: onnext(on)
, issubscribed(true)
, count(0)
{
}
bool is_subscribed() {return issubscribed;}
void unsubscribe() {issubscribed = false;}
ready on_next(T v) {
// push v onto queue
// under some condition pop v off of queue and pass it on
onnext(v);
// for demo purposes
// simulate queue full every 100000 items
if (count == 100000) {
// 'queue is full'
ready no([this](ready::onthen_type ot){
// full version will sync producer and consumer (in producer push and consumer pop)
// and decide when to restart the producer
if (!this->count) {
ot();
}
});
// set queue empty since the demo has no separate consumer thread
count = 0;
// 'queue is empty'
return no;
}
static const ready yes;
return yes;
}
};
}
SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){
static const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("calling on_next 100 million times"){
using namespace std::chrono;
typedef steady_clock clock;
auto c = std::addressof(aliased);
*c = 0;
int n = 1;
auto start = clock::now();
auto onnext = [&c](int){++*c;};
asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext);
asyncwithready::ready::onthen_type chunk;
int i = 0;
chunk = [&chunk, scbr, i]() mutable {
for (; i < onnextcalls && scbr.is_subscribed(); i++) {
auto controller = scbr.on_next(i);
if (!controller.is_ready()) {
controller.then(chunk);
return;
}
}
};
chunk();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
namespace asyncwithfuture {
class unit {};
template<class T, class OnNext>
class async_subscriber
{
public:
OnNext onnext;
bool issubscribed;
explicit async_subscriber(OnNext on)
: onnext(on)
, issubscribed(true)
{
}
bool is_subscribed() {return issubscribed;}
void unsubscribe() {issubscribed = false;}
std::future<unit> on_next(T v) {
std::promise<unit> ready;
ready.set_value(unit());
onnext(v); return ready.get_future();}
};
}
SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("calling on_next 100 million times"){
using namespace std::chrono;
typedef steady_clock clock;
auto c = std::addressof(aliased);
*c = 0;
int n = 1;
auto start = clock::now();
auto onnext = [&c](int){++*c;};
asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext);
for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
auto isready = scbr.on_next(i);
if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) {
isready.wait();
}
}
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("observing 100 million ints"){
using namespace std::chrono;
typedef steady_clock clock;
static int& c = aliased;
int n = 1;
c = 0;
auto start = clock::now();
auto o = rx::make_observer<int>(
[](int){++c;},
[](rxu::error_ptr){abort();});
for (int i = 0; i < onnextcalls; i++) {
o.on_next(i);
}
o.on_completed();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
}
}
}
SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop"){
WHEN("observing 100 million ints"){
using namespace std::chrono;
typedef steady_clock clock;
static int& c = aliased;
int n = 1;
c = 0;
auto start = clock::now();
auto o = rx::make_subscriber<int>(
[](int){++c;},
[](rxu::error_ptr){abort();});
for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
o.on_next(i);
}
o.on_completed();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){
const int& onnextcalls = static_onnextcalls;
GIVEN("a range"){
WHEN("observing 100 million ints"){
using namespace std::chrono;
typedef steady_clock clock;
static int& c = aliased;
int n = 1;
c = 0;
auto start = clock::now();
rxs::range<int>(1, onnextcalls).subscribe(
[](int){
++c;
},
[](rxu::error_ptr){abort();});
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){
static const int& onnextcalls = static_onnextcalls;
GIVEN("a for loop and a subject"){
WHEN("multicasting a million ints"){
using namespace std::chrono;
typedef steady_clock clock;
for (int n = 0; n < 10; n++)
{
auto p = std::make_shared<int>(0);
auto c = std::make_shared<int>(0);
rxsub::subject<int> sub;
#if RXCPP_SUBJECT_TEST_ASYNC
std::vector<std::future<int>> f(n);
std::atomic<int> asyncUnsubscriptions{0};
#endif
auto o = sub.get_subscriber();
o.add(rx::make_subscription([c, n](){
auto expected = n * onnextcalls;
REQUIRE(*c == expected);
}));
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
auto source = sub.get_observable();
while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
source
.finally([&asyncUnsubscriptions](){
++asyncUnsubscriptions;})
.subscribe(
rx::make_subscriber<int>(
cs,
[cs](int){
cs.unsubscribe();
},
[](rxu::error_ptr){abort();}));
}
return 0;
});
#endif
sub.get_observable().subscribe(
[c, p](int){
++(*c);
},
[](rxu::error_ptr){abort();});
}
auto start = clock::now();
for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
#if RXCPP_DEBUG_SUBJECT_RACE
if (*p != *c) abort();
(*p) += n;
#endif
o.on_next(i);
}
o.on_completed();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
#if RXCPP_SUBJECT_TEST_ASYNC
std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
#endif
std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
}
}
}
}
SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){
static const int& onnextcalls = static_onnextcalls;
GIVEN("a range and a subject"){
WHEN("multicasting a million ints"){
using namespace std::chrono;
typedef steady_clock clock;
for (int n = 0; n < 10; n++)
{
auto p = std::make_shared<int>(0);
auto c = std::make_shared<int>(0);
rxsub::subject<int> sub;
#if RXCPP_SUBJECT_TEST_ASYNC
std::vector<std::future<int>> f(n);
std::atomic<int> asyncUnsubscriptions{0};
#endif
auto o = sub.get_subscriber();
o.add(rx::make_subscription([c, n](){
auto expected = n * onnextcalls;
REQUIRE(*c == expected);
}));
for (int i = 0; i < n; i++) {
#if RXCPP_SUBJECT_TEST_ASYNC
f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
while(o.is_subscribed()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rx::composite_subscription cs;
sub.get_observable()
.finally([&asyncUnsubscriptions](){
++asyncUnsubscriptions;})
.subscribe(cs,
[cs](int){
cs.unsubscribe();
},
[](rxu::error_ptr){abort();});
}
return 0;
});
#endif
sub.get_observable()
.subscribe(
[c, p](int){
++(*c);
},
[](rxu::error_ptr){abort();}
);
}
auto start = clock::now();
rxs::range<int>(1, onnextcalls)
#if RXCPP_DEBUG_SUBJECT_RACE
.filter([c, p, n](int){
if (*p != *c) abort();
(*p) += n;
return true;
})
#endif
.subscribe(o);
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
#if RXCPP_SUBJECT_TEST_ASYNC
std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
#endif
std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
}
}
}
}
SCENARIO("subject - infinite source", "[subject][subjects]"){
GIVEN("a subject and an infinite source"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<bool> check;
auto xs = sc.make_hot_observable({
on.next(70, 1),
on.next(110, 2),
on.next(220, 3),
on.next(270, 4),
on.next(340, 5),
on.next(410, 6),
on.next(520, 7),
on.next(630, 8),
on.next(710, 9),
on.next(870, 10),
on.next(940, 11),
on.next(1020, 12)
});
rxsub::subject<int> s;
auto results1 = w.make_subscriber<int>();
auto results2 = w.make_subscriber<int>();
auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto checks = rxu::to_vector({
check.next(0, false)
});
auto record = [&s, &check, &checks](long at) -> void {
checks.push_back(check.next(at, s.has_observers()));
};
auto o = s.get_subscriber();
w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){
s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);});
w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){
xs.subscribe(o); record(200);});
w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){
o.unsubscribe(); record(1000);});
w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){
s.get_observable().subscribe(results1); record(300);});
w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){
s.get_observable().subscribe(results2); record(400);});
w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){
s.get_observable().subscribe(results3); record(900);});
w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){
results1.unsubscribe(); record(600);});
w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){
results2.unsubscribe(); record(700);});
w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){
results1.unsubscribe(); record(800);});
w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){
results3.unsubscribe(); record(950);});
w.start();
THEN("result1 contains expected messages"){
auto required = rxu::to_vector({
on.next(340, 5),
on.next(410, 6),
on.next(520, 7)
});
auto actual = results1.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result2 contains expected messages"){
auto required = rxu::to_vector({
on.next(410, 6),
on.next(520, 7),
on.next(630, 8)
});
auto actual = results2.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result3 contains expected messages"){
auto required = rxu::to_vector({
on.next(940, 11)
});
auto actual = results3.get_observer().messages();
REQUIRE(required == actual);
}
THEN("checks contains expected messages"){
auto required = rxu::to_vector({
check.next(100, false),
check.next(200, false),
check.next(300, true),
check.next(400, true),
check.next(600, true),
check.next(700, false),
check.next(800, false),
check.next(900, true),
check.next(950, false),
check.next(1000, false)
});
auto actual = checks;
REQUIRE(required == actual);
}
}
}
}
SCENARIO("subject - finite source", "[subject][subjects]"){
GIVEN("a subject and an finite source"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
auto xs = sc.make_hot_observable({
on.next(70, 1),
on.next(110, 2),
on.next(220, 3),
on.next(270, 4),
on.next(340, 5),
on.next(410, 6),
on.next(520, 7),
on.completed(630),
on.next(640, 9),
on.completed(650),
on.error(660, std::runtime_error("error on unsubscribed stream"))
});
rxsub::subject<int> s;
auto results1 = w.make_subscriber<int>();
auto results2 = w.make_subscriber<int>();
auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto o = s.get_subscriber();
w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
s = rxsub::subject<int>(); o = s.get_subscriber();});
w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
xs.subscribe(o);});
w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
o.unsubscribe();});
w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
s.get_observable().subscribe(results1);});
w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
s.get_observable().subscribe(results2);});
w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
s.get_observable().subscribe(results3);});
w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
results1.unsubscribe();});
w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
results2.unsubscribe();});
w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
results1.unsubscribe();});
w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
results3.unsubscribe();});
w.start();
THEN("result1 contains expected messages"){
auto required = rxu::to_vector({
on.next(340, 5),
on.next(410, 6),
on.next(520, 7)
});
auto actual = results1.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result2 contains expected messages"){
auto required = rxu::to_vector({
on.next(410, 6),
on.next(520, 7),
on.completed(630)
});
auto actual = results2.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result3 contains expected messages"){
auto required = rxu::to_vector({
on.completed(900)
});
auto actual = results3.get_observer().messages();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("subject - on_error in source", "[subject][subjects]"){
GIVEN("a subject and a source with an error"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
std::runtime_error ex("subject on_error in stream");
auto xs = sc.make_hot_observable({
on.next(70, 1),
on.next(110, 2),
on.next(220, 3),
on.next(270, 4),
on.next(340, 5),
on.next(410, 6),
on.next(520, 7),
on.error(630, ex),
on.next(640, 9),
on.completed(650),
on.error(660, std::runtime_error("error on unsubscribed stream"))
});
rxsub::subject<int> s;
auto results1 = w.make_subscriber<int>();
auto results2 = w.make_subscriber<int>();
auto results3 = w.make_subscriber<int>();
WHEN("multicasting an infinite source"){
auto o = s.get_subscriber();
w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
s = rxsub::subject<int>(); o = s.get_subscriber();});
w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
xs.subscribe(o);});
w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
o.unsubscribe();});
w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
s.get_observable().subscribe(results1);});
w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
s.get_observable().subscribe(results2);});
w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
s.get_observable().subscribe(results3);});
w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
results1.unsubscribe();});
w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
results2.unsubscribe();});
w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
results1.unsubscribe();});
w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
results3.unsubscribe();});
w.start();
THEN("result1 contains expected messages"){
auto required = rxu::to_vector({
on.next(340, 5),
on.next(410, 6),
on.next(520, 7)
});
auto actual = results1.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result2 contains expected messages"){
auto required = rxu::to_vector({
on.next(410, 6),
on.next(520, 7),
on.error(630, ex)
});
auto actual = results2.get_observer().messages();
REQUIRE(required == actual);
}
THEN("result3 contains expected messages"){
auto required = rxu::to_vector({
on.error(900, ex)
});
auto actual = results3.get_observer().messages();
REQUIRE(required == actual);
}
}
}
}