#define RXCPP_SUBJECT_TEST_ASYNC 1 #include "../test.h" #include <rxcpp/operators/rx-finally.hpp> #include <future> const int static_onnextcalls = 10000000; static int aliased = 0; SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("locking mutex 100 million times"){ using namespace std::chrono; typedef steady_clock clock; int c = 0; int n = 1; auto start = clock::now(); std::mutex m; for (int i = 0; i < onnextcalls; i++) { std::unique_lock<std::mutex> guard(m); ++c; } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } namespace syncwithvoid { template<class T, class OnNext> class sync_subscriber { public: OnNext onnext; bool issubscribed; explicit sync_subscriber(OnNext on) : onnext(on) , issubscribed(true) { } bool is_subscribed() {return issubscribed;} void unsubscribe() {issubscribed = false;} void on_next(T v) { onnext(v); } }; } SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("calling on_next 100 million times"){ using namespace std::chrono; typedef steady_clock clock; auto c = std::addressof(aliased); *c = 0; int n = 1; auto start = clock::now(); auto onnext = [c](int){++*c;}; syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext); for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { scbr.on_next(i); } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } namespace asyncwithready { // ready is an immutable class. class ready { public: typedef std::function<void()> onthen_type; private: std::function<void(onthen_type)> setthen; public: ready() {} ready(std::function<void(onthen_type)> st) : setthen(st) {} bool is_ready() {return !setthen;} void then(onthen_type ot) { if (is_ready()) { abort(); } setthen(ot); } }; template<class T, class OnNext> class async_subscriber { public: OnNext onnext; bool issubscribed; int count; explicit async_subscriber(OnNext on) : onnext(on) , issubscribed(true) , count(0) { } bool is_subscribed() {return issubscribed;} void unsubscribe() {issubscribed = false;} ready on_next(T v) { // push v onto queue // under some condition pop v off of queue and pass it on onnext(v); // for demo purposes // simulate queue full every 100000 items if (count == 100000) { // 'queue is full' ready no([this](ready::onthen_type ot){ // full version will sync producer and consumer (in producer push and consumer pop) // and decide when to restart the producer if (!this->count) { ot(); } }); // set queue empty since the demo has no separate consumer thread count = 0; // 'queue is empty' return no; } static const ready yes; return yes; } }; } SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){ static const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("calling on_next 100 million times"){ using namespace std::chrono; typedef steady_clock clock; auto c = std::addressof(aliased); *c = 0; int n = 1; auto start = clock::now(); auto onnext = [&c](int){++*c;}; asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext); asyncwithready::ready::onthen_type chunk; int i = 0; chunk = [&chunk, scbr, i]() mutable { for (; i < onnextcalls && scbr.is_subscribed(); i++) { auto controller = scbr.on_next(i); if (!controller.is_ready()) { controller.then(chunk); return; } } }; chunk(); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } namespace asyncwithfuture { class unit {}; template<class T, class OnNext> class async_subscriber { public: OnNext onnext; bool issubscribed; explicit async_subscriber(OnNext on) : onnext(on) , issubscribed(true) { } bool is_subscribed() {return issubscribed;} void unsubscribe() {issubscribed = false;} std::future<unit> on_next(T v) { std::promise<unit> ready; ready.set_value(unit()); onnext(v); return ready.get_future();} }; } SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("calling on_next 100 million times"){ using namespace std::chrono; typedef steady_clock clock; auto c = std::addressof(aliased); *c = 0; int n = 1; auto start = clock::now(); auto onnext = [&c](int){++*c;}; asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext); for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { auto isready = scbr.on_next(i); if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { isready.wait(); } } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("observing 100 million ints"){ using namespace std::chrono; typedef steady_clock clock; static int& c = aliased; int n = 1; c = 0; auto start = clock::now(); auto o = rx::make_observer<int>( [](int){++c;}, [](rxu::error_ptr){abort();}); for (int i = 0; i < onnextcalls; i++) { o.on_next(i); } o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; } } } SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a for loop"){ WHEN("observing 100 million ints"){ using namespace std::chrono; typedef steady_clock clock; static int& c = aliased; int n = 1; c = 0; auto start = clock::now(); auto o = rx::make_subscriber<int>( [](int){++c;}, [](rxu::error_ptr){abort();}); for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { o.on_next(i); } o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a range"){ WHEN("observing 100 million ints"){ using namespace std::chrono; typedef steady_clock clock; static int& c = aliased; int n = 1; c = 0; auto start = clock::now(); rxs::range<int>(1, onnextcalls).subscribe( [](int){ ++c; }, [](rxu::error_ptr){abort();}); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){ static const int& onnextcalls = static_onnextcalls; GIVEN("a for loop and a subject"){ WHEN("multicasting a million ints"){ using namespace std::chrono; typedef steady_clock clock; for (int n = 0; n < 10; n++) { auto p = std::make_shared<int>(0); auto c = std::make_shared<int>(0); rxsub::subject<int> sub; #if RXCPP_SUBJECT_TEST_ASYNC std::vector<std::future<int>> f(n); std::atomic<int> asyncUnsubscriptions{0}; #endif auto o = sub.get_subscriber(); o.add(rx::make_subscription([c, n](){ auto expected = n * onnextcalls; REQUIRE(*c == expected); })); for (int i = 0; i < n; i++) { #if RXCPP_SUBJECT_TEST_ASYNC f[i] = std::async([sub, o, &asyncUnsubscriptions]() { auto source = sub.get_observable(); while(o.is_subscribed()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); rx::composite_subscription cs; source .finally([&asyncUnsubscriptions](){ ++asyncUnsubscriptions;}) .subscribe( rx::make_subscriber<int>( cs, [cs](int){ cs.unsubscribe(); }, [](rxu::error_ptr){abort();})); } return 0; }); #endif sub.get_observable().subscribe( [c, p](int){ ++(*c); }, [](rxu::error_ptr){abort();}); } auto start = clock::now(); for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { #if RXCPP_DEBUG_SUBJECT_RACE if (*p != *c) abort(); (*p) += n; #endif o.on_next(i); } o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; #if RXCPP_SUBJECT_TEST_ASYNC std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; #endif std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } } SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){ static const int& onnextcalls = static_onnextcalls; GIVEN("a range and a subject"){ WHEN("multicasting a million ints"){ using namespace std::chrono; typedef steady_clock clock; for (int n = 0; n < 10; n++) { auto p = std::make_shared<int>(0); auto c = std::make_shared<int>(0); rxsub::subject<int> sub; #if RXCPP_SUBJECT_TEST_ASYNC std::vector<std::future<int>> f(n); std::atomic<int> asyncUnsubscriptions{0}; #endif auto o = sub.get_subscriber(); o.add(rx::make_subscription([c, n](){ auto expected = n * onnextcalls; REQUIRE(*c == expected); })); for (int i = 0; i < n; i++) { #if RXCPP_SUBJECT_TEST_ASYNC f[i] = std::async([sub, o, &asyncUnsubscriptions]() { while(o.is_subscribed()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); rx::composite_subscription cs; sub.get_observable() .finally([&asyncUnsubscriptions](){ ++asyncUnsubscriptions;}) .subscribe(cs, [cs](int){ cs.unsubscribe(); }, [](rxu::error_ptr){abort();}); } return 0; }); #endif sub.get_observable() .subscribe( [c, p](int){ ++(*c); }, [](rxu::error_ptr){abort();} ); } auto start = clock::now(); rxs::range<int>(1, onnextcalls) #if RXCPP_DEBUG_SUBJECT_RACE .filter([c, p, n](int){ if (*p != *c) abort(); (*p) += n; return true; }) #endif .subscribe(o); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; #if RXCPP_SUBJECT_TEST_ASYNC std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; #endif std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; } } } } SCENARIO("subject - infinite source", "[subject][subjects]"){ GIVEN("a subject and an infinite source"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; const rxsc::test::messages<bool> check; auto xs = sc.make_hot_observable({ on.next(70, 1), on.next(110, 2), on.next(220, 3), on.next(270, 4), on.next(340, 5), on.next(410, 6), on.next(520, 7), on.next(630, 8), on.next(710, 9), on.next(870, 10), on.next(940, 11), on.next(1020, 12) }); rxsub::subject<int> s; auto results1 = w.make_subscriber<int>(); auto results2 = w.make_subscriber<int>(); auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto checks = rxu::to_vector({ check.next(0, false) }); auto record = [&s, &check, &checks](long at) -> void { checks.push_back(check.next(at, s.has_observers())); }; auto o = s.get_subscriber(); w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){ s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);}); w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){ xs.subscribe(o); record(200);}); w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){ o.unsubscribe(); record(1000);}); w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){ s.get_observable().subscribe(results1); record(300);}); w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){ s.get_observable().subscribe(results2); record(400);}); w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){ s.get_observable().subscribe(results3); record(900);}); w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){ results1.unsubscribe(); record(600);}); w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){ results2.unsubscribe(); record(700);}); w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){ results1.unsubscribe(); record(800);}); w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){ results3.unsubscribe(); record(950);}); w.start(); THEN("result1 contains expected messages"){ auto required = rxu::to_vector({ on.next(340, 5), on.next(410, 6), on.next(520, 7) }); auto actual = results1.get_observer().messages(); REQUIRE(required == actual); } THEN("result2 contains expected messages"){ auto required = rxu::to_vector({ on.next(410, 6), on.next(520, 7), on.next(630, 8) }); auto actual = results2.get_observer().messages(); REQUIRE(required == actual); } THEN("result3 contains expected messages"){ auto required = rxu::to_vector({ on.next(940, 11) }); auto actual = results3.get_observer().messages(); REQUIRE(required == actual); } THEN("checks contains expected messages"){ auto required = rxu::to_vector({ check.next(100, false), check.next(200, false), check.next(300, true), check.next(400, true), check.next(600, true), check.next(700, false), check.next(800, false), check.next(900, true), check.next(950, false), check.next(1000, false) }); auto actual = checks; REQUIRE(required == actual); } } } } SCENARIO("subject - finite source", "[subject][subjects]"){ GIVEN("a subject and an finite source"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(70, 1), on.next(110, 2), on.next(220, 3), on.next(270, 4), on.next(340, 5), on.next(410, 6), on.next(520, 7), on.completed(630), on.next(640, 9), on.completed(650), on.error(660, std::runtime_error("error on unsubscribed stream")) }); rxsub::subject<int> s; auto results1 = w.make_subscriber<int>(); auto results2 = w.make_subscriber<int>(); auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto o = s.get_subscriber(); w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ s = rxsub::subject<int>(); o = s.get_subscriber();}); w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ xs.subscribe(o);}); w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ o.unsubscribe();}); w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ s.get_observable().subscribe(results1);}); w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ s.get_observable().subscribe(results2);}); w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ s.get_observable().subscribe(results3);}); w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ results1.unsubscribe();}); w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ results2.unsubscribe();}); w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ results1.unsubscribe();}); w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ results3.unsubscribe();}); w.start(); THEN("result1 contains expected messages"){ auto required = rxu::to_vector({ on.next(340, 5), on.next(410, 6), on.next(520, 7) }); auto actual = results1.get_observer().messages(); REQUIRE(required == actual); } THEN("result2 contains expected messages"){ auto required = rxu::to_vector({ on.next(410, 6), on.next(520, 7), on.completed(630) }); auto actual = results2.get_observer().messages(); REQUIRE(required == actual); } THEN("result3 contains expected messages"){ auto required = rxu::to_vector({ on.completed(900) }); auto actual = results3.get_observer().messages(); REQUIRE(required == actual); } } } } SCENARIO("subject - on_error in source", "[subject][subjects]"){ GIVEN("a subject and a source with an error"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; std::runtime_error ex("subject on_error in stream"); auto xs = sc.make_hot_observable({ on.next(70, 1), on.next(110, 2), on.next(220, 3), on.next(270, 4), on.next(340, 5), on.next(410, 6), on.next(520, 7), on.error(630, ex), on.next(640, 9), on.completed(650), on.error(660, std::runtime_error("error on unsubscribed stream")) }); rxsub::subject<int> s; auto results1 = w.make_subscriber<int>(); auto results2 = w.make_subscriber<int>(); auto results3 = w.make_subscriber<int>(); WHEN("multicasting an infinite source"){ auto o = s.get_subscriber(); w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ s = rxsub::subject<int>(); o = s.get_subscriber();}); w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ xs.subscribe(o);}); w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ o.unsubscribe();}); w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ s.get_observable().subscribe(results1);}); w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ s.get_observable().subscribe(results2);}); w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ s.get_observable().subscribe(results3);}); w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ results1.unsubscribe();}); w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ results2.unsubscribe();}); w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ results1.unsubscribe();}); w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ results3.unsubscribe();}); w.start(); THEN("result1 contains expected messages"){ auto required = rxu::to_vector({ on.next(340, 5), on.next(410, 6), on.next(520, 7) }); auto actual = results1.get_observer().messages(); REQUIRE(required == actual); } THEN("result2 contains expected messages"){ auto required = rxu::to_vector({ on.next(410, 6), on.next(520, 7), on.error(630, ex) }); auto actual = results2.get_observer().messages(); REQUIRE(required == actual); } THEN("result3 contains expected messages"){ auto required = rxu::to_vector({ on.error(900, ex) }); auto actual = results3.get_observer().messages(); REQUIRE(required == actual); } } } }