#include "../test.h" #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-observe_on.hpp> const int static_onnextcalls = 100000; SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a range"){ WHEN("multicasting a million ints"){ using namespace std::chrono; typedef steady_clock clock; auto el = rx::observe_on_new_thread(); for (int n = 0; n < 10; n++) { std::atomic_bool disposed; std::atomic_bool done; auto c = std::make_shared<int>(0); rx::composite_subscription cs; cs.add([&](){ if (!done) {abort();} disposed = true; }); auto start = clock::now(); rxs::range<int>(1) .take(onnextcalls) .observe_on(el) .as_blocking() .subscribe( cs, [c](int){ ++(*c); }, [&](){ done = true; }); auto expected = onnextcalls; REQUIRE(*c == expected); auto finish = clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl; } } } } SCENARIO("observe_on", "[observe][observe_on]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(150, 1), on.next(210, 2), on.next(240, 3), on.completed(300) }); WHEN("subscribe_on is specified"){ auto res = w.start( [so, xs]() { return xs .observe_on(so); } ); THEN("the output contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(211, 2), on.next(241, 3), on.completed(301) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ on.subscribe(200, 300) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("stream observe_on", "[observe][observe_on]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::synchronize_in_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(150, 1), on.next(210, 2), on.next(240, 3), on.completed(300) }); WHEN("observe_on is specified"){ auto res = w.start( [so, xs]() { return xs | rxo::observe_on(so); } ); THEN("the output contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(211, 2), on.next(241, 3), on.completed(301) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ on.subscribe(200, 300) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } class nocompare { public: int v; }; SCENARIO("observe_on no-comparison", "[observe][observe_on]"){ GIVEN("a source"){ auto sc = rxsc::make_test(); auto so = rx::observe_on_one_worker(sc); auto w = sc.create_worker(); const rxsc::test::messages<nocompare> in; const rxsc::test::messages<int> out; auto xs = sc.make_hot_observable({ in.next(150, nocompare{1}), in.next(210, nocompare{2}), in.next(240, nocompare{3}), in.completed(300) }); WHEN("observe_on is specified"){ auto res = w.start( [so, xs]() { return xs | rxo::observe_on(so) | rxo::map([](nocompare v){ return v.v; }) | rxo::as_dynamic(); } ); THEN("the output contains items sent while subscribed"){ auto required = rxu::to_vector({ out.next(211, 2), out.next(241, 3), out.completed(301) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription to the source"){ auto required = rxu::to_vector({ out.subscribe(200, 300) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } }