#include "../test.h" #include <rxcpp/operators/rx-concat.hpp> #include <rxcpp/operators/rx-group_by.hpp> #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-merge.hpp> #include <rxcpp/operators/rx-take.hpp> #include <rxcpp/operators/rx-start_with.hpp> #include <rxcpp/operators/rx-observe_on.hpp> #include <locale> #include <sstream> SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){ GIVEN("a for loop"){ WHEN("partitioning pi series across all hardware threads"){ std::atomic_int c; c = 0; auto pi = [&](int k) { ++c; return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L ); }; using namespace std::chrono; auto start = steady_clock::now(); // share an output thread across all the producer threads auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler()); struct work { int index; int first; int last; }; // use all available hardware threads auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1). map( [](int index){ static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency()); int first = (chunk * index) + 1; int last = chunk * (index + 1); return work{index, first, last};} ). group_by( [](work w) -> int {return w.index % std::thread::hardware_concurrency();}, [](work w){return w;}). map( [=](rxcpp::grouped_observable<int, work> onproc) { auto key = onproc.get_key(); // share a producer thread across all the ranges in this group of chunks auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler()); return onproc. map( [=](work w){ std::stringstream message; message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last; return rxcpp::observable<>::range(w.first, w.last, producerthread). map(pi). sum(). // each thread maps and reduces its contribution to the answer map( [=](long double v){ std::stringstream message; message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; return std::make_tuple(message.str(), v); }). start_with(std::make_tuple(message.str(), 0.0L)). as_dynamic(); }). concat(). // only subscribe to one range at a time in this group. observe_on(outputthread). map(rxcpp::util::apply_to( [](std::string message, long double v){ std::cout << message << std::endl; return v; })). as_dynamic(); }). merge(). sum(). // reduces the contributions from all the threads to the answer as_blocking(). last(); std::cout << std::setprecision(16) << "Pi: " << total << std::endl; auto finish = steady_clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){ GIVEN("a for loop"){ WHEN("partitioning pi series across all hardware threads"){ std::atomic_int c; c = 0; auto pi = [&](int k) { ++c; return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L ); }; using namespace std::chrono; auto start = steady_clock::now(); struct work { int index; int first; int last; }; // use all available hardware threads auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1). map( [](int index){ static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency()); int first = (chunk * index) + 1; int last = chunk * (index + 1); return work{index, first, last}; }). map( [=](work w){ std::stringstream message; message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last; // create a new thread for every chunk return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()). map(pi). sum(). // each thread maps and reduces its contribution to the answer map( [=](long double v){ std::stringstream message; message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v; return std::make_tuple(message.str(), v); }). start_with(std::make_tuple(message.str(), 0.0L)). as_dynamic(); }). merge(rxcpp::observe_on_new_thread()). map(rxcpp::util::apply_to( [](std::string message, long double v){ std::cout << message << std::endl; return v; })). sum(). // reduces the contributions from all the threads to the answer as_blocking(). last(); std::cout << std::setprecision(16) << "Pi: " << total << std::endl; auto finish = steady_clock::now(); auto msElapsed = duration_cast<milliseconds>(finish-start); std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } char whitespace(char c) { return std::isspace<char>(c, std::locale::classic()); } std::string trim(std::string s) { auto first = std::find_if_not(s.begin(), s.end(), whitespace); auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace); if (last != s.rend()) { s.erase(s.end() - (last-s.rbegin()), s.end()); } s.erase(s.begin(), first); return s; } bool tolowerLess(char lhs, char rhs) { return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic()); } bool tolowerStringLess(const std::string& lhs, const std::string& rhs) { return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess); } SCENARIO("group_by", "[group_by][operators]"){ GIVEN("1 hot observable of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<std::string> on; int keyInvoked = 0; int marbleInvoked = 0; auto xs = sc.make_hot_observable({ on.next(90, "error"), on.next(110, "error"), on.next(130, "error"), on.next(220, " foo"), on.next(240, " FoO "), on.next(270, "baR "), on.next(310, "foO "), on.next(350, " Baz "), on.next(360, " qux "), on.next(390, " bar"), on.next(420, " BAR "), on.next(470, "FOO "), on.next(480, "baz "), on.next(510, " bAZ "), on.next(530, " fOo "), on.completed(570), on.next(580, "error"), on.completed(600), on.error(650, std::runtime_error("error in completed sequence")) }); WHEN("group normalized strings"){ auto res = w.start( [&]() { return xs .group_by( [&](std::string v){ ++keyInvoked; return trim(std::move(v)); }, [&](std::string v){ ++marbleInvoked; std::reverse(v.begin(), v.end()); return v; }, tolowerStringLess) .map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();}) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); } ); THEN("the output contains groups of group keys"){ auto required = rxu::to_vector({ on.next(220, "foo"), on.next(270, "baR"), on.next(350, "Baz"), on.next(360, "qux"), on.completed(570) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription to the xs"){ auto required = rxu::to_vector({ on.subscribe(200, 570) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("key selector was invoked for each value"){ REQUIRE(12 == keyInvoked); } THEN("marble selector was invoked for each value"){ REQUIRE(12 == marbleInvoked); } } } } SCENARIO("group_by take 1", "[group_by][take][operators]"){ GIVEN("1 hot observable of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<long> on; int keyInvoked = 0; int marbleInvoked = 0; int groupEmitted = 0; auto xs = sc.make_hot_observable({ on.next(130, -1), on.next(220, 0), on.next(240, -1), on.next(270, 2), on.next(310, -3), on.next(350, 4), on.next(360, -5), on.next(390, 6), on.next(420, -7), on.next(470, 8), on.next(480, -9), on.completed(570) }); WHEN("1 group of ints is emitted"){ auto res = w.start( [&]() { return xs | rxo::group_by( [&](long v) { ++keyInvoked; return v % 2; }, [&](long v){ ++marbleInvoked; return v; }) | rxo::take(1) | rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> { ++groupEmitted; return g; }) | rxo::merge() // forget type to workaround lambda deduction bug on msvc 2013 | rxo::as_dynamic(); } ); THEN("the output contains groups of ints"){ auto required = rxu::to_vector({ on.next(220, 0), on.next(270, 2), on.next(350, 4), on.next(390, 6), on.next(470, 8), on.completed(570) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription to the xs"){ auto required = rxu::to_vector({ on.subscribe(200, 570) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("key selector was invoked for each value"){ REQUIRE(10 == keyInvoked); } THEN("marble selector was invoked for each value"){ REQUIRE(5 == marbleInvoked); } THEN("1 group emitted"){ REQUIRE(1 == groupEmitted); } } } } SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){ GIVEN("1 hot observable of ints."){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<long> on; int keyInvoked = 0; int marbleInvoked = 0; int groupEmitted = 0; auto xs = sc.make_hot_observable({ on.next(130, -1), on.next(220, 0), on.next(240, -1), on.next(270, 2), on.next(310, -3), on.next(350, 4), on.next(360, -5), on.next(390, 6), on.next(420, -7), }); WHEN("1 group of ints is emitted"){ auto res = w.start( [&]() { return xs .group_by( [&](long v) { ++keyInvoked; return v % 2; }, [&](long v){ ++marbleInvoked; return v; }) .take(1) .map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> { ++groupEmitted; return g.take(4); }) .merge() // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); } ); THEN("the output contains groups of ints"){ auto required = rxu::to_vector({ on.next(220, 0), on.next(270, 2), on.next(350, 4), on.next(390, 6), on.completed(390) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription to the xs"){ auto required = rxu::to_vector({ on.subscribe(200, 390) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("key selector was invoked for each value"){ REQUIRE(7 == keyInvoked); } THEN("marble selector was invoked for each value"){ REQUIRE(4 == marbleInvoked); } THEN("1 group emitted"){ REQUIRE(1 == groupEmitted); } } } }