#include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-subscribe_on.hpp> #include <rxcpp/operators/rx-observe_on.hpp> #include <sstream> static const int static_subscriptions = 50000; SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){ const int& subscriptions = static_subscriptions; GIVEN("a for loop"){ WHEN("subscribe 50K times"){ using namespace std::chrono; typedef steady_clock clock; int runs = 10; for (;runs > 0; --runs) { int c = 0; int n = 1; auto start = clock::now(); for (int i = 0; i < subscriptions; ++i) { c += rx::observable<>::just(1) .map([](int i) { std::stringstream serializer; serializer << i; return serializer.str(); }) .map([](const std::string& s) { int i; std::stringstream(s) >> i; return i; }) .subscribe_on(rx::observe_on_event_loop()) .observe_on(rx::observe_on_event_loop()) .as_blocking() .count(); } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); REQUIRE(subscriptions == c); std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } } SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){ const int& subscriptions = static_subscriptions; GIVEN("a for loop"){ WHEN("subscribe 50K times"){ using namespace std::chrono; typedef steady_clock clock; int runs = 10; for (;runs > 0; --runs) { int c = 0; int n = 1; auto start = clock::now(); for (int i = 0; i < subscriptions; ++i) { c += rx::observable<>:: just(1). map([](int i) { std::stringstream serializer; serializer << i; return serializer.str(); }). map([](const std::string& s) { int i; std::stringstream(s) >> i; return i; }). subscribe_on(rx::observe_on_event_loop()). as_blocking(). count(); } auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); REQUIRE(subscriptions == c); std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } } SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(150, 1), on.next(210, 2), on.next(240, 3), on.completed(300) }); WHEN("subscribe_on is specified"){ auto res = w.start( [so, xs]() { return xs .subscribe_on(so); } ); THEN("the output contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(210, 2), on.next(240, 3), on.completed(300) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ on.subscribe(201, 300) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(150, 1), on.next(210, 2), on.next(240, 3), on.completed(300) }); WHEN("subscribe_on is specified"){ auto res = w.start( [so, xs]() { return xs | rxo::subscribe_on(so); } ); THEN("the output contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(210, 2), on.next(240, 3), on.completed(300) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ on.subscribe(201, 300) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } }