#include "../test.h" #include <rxcpp/operators/rx-publish.hpp> #include <rxcpp/operators/rx-connect_forever.hpp> #include <rxcpp/operators/rx-ref_count.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-merge.hpp> SCENARIO("publish range", "[!hide][range][subject][publish][subject][operators]"){ GIVEN("a range"){ WHEN("published"){ auto published = rxs::range<int>(0, 10).publish(); std::cout << "subscribe to published" << std::endl; published.subscribe( // on_next [](int v){std::cout << v << ", ";}, // on_completed [](){std::cout << " done." << std::endl;}); std::cout << "connect to published" << std::endl; published.connect(); } WHEN("ref_count is used"){ auto published = rxs::range<int>(0, 10).publish().ref_count(); std::cout << "subscribe to ref_count" << std::endl; published.subscribe( // on_next [](int v){std::cout << v << ", ";}, // on_completed [](){std::cout << " done." << std::endl;}); } WHEN("connect_forever is used"){ auto published = rxs::range<int>(0, 10).publish().connect_forever(); std::cout << "subscribe to connect_forever" << std::endl; published.subscribe( // on_next [](int v){std::cout << v << ", ";}, // on_completed [](){std::cout << " done." << std::endl;}); } } } SCENARIO("publish ref_count", "[range][subject][publish][ref_count][operators]"){ GIVEN("a range"){ WHEN("ref_count is used"){ auto published = rxs::range<int>(0, 3).publish().ref_count(); std::vector<int> results; published.subscribe( // on_next [&](int v){ results.push_back(v); }, // on_completed [](){}); std::vector<int> expected_results; expected_results.push_back(0); expected_results.push_back(1); expected_results.push_back(2); expected_results.push_back(3); CHECK(results == expected_results); } WHEN("ref_count(other) is used"){ auto published = rxs::range<double>(0, 10).publish(); auto map_to_int = published.map([](double v) { return (long) v; }); // Ensures that 'ref_count(other)' has the source value type, // not the publisher's value type. auto with_ref_count = map_to_int.ref_count(published); std::vector<long> results; with_ref_count.subscribe( // on_next [&](long v){ results.push_back(v); }, // on_completed [](){}); std::vector<long> expected_results; for (long i = 0; i <= 10; ++i) { expected_results.push_back(i); } CHECK(results == expected_results); } WHEN("ref_count(other) is used in a diamond"){ auto source = rxs::range<double>(0, 3); int published_on_next_count = 0; // Ensure we only subscribe once to 'published' when its in a diamond. auto next = source.map( [&](double v) { published_on_next_count++; return v; } ); auto published = next.publish(); // Ensures that 'x.ref_count(other)' has the 'x' value type, not the other's value // type. auto map_to_int = published.map([](double v) { return (long) v; }); auto left = map_to_int.map([](long v) { return v * 2; }); auto right = map_to_int.map([](long v) { return v * 100; }); auto merge = left.merge(right); auto with_ref_count = merge.ref_count(published); std::vector<long> results; with_ref_count.subscribe( // on_next [&](long v){ results.push_back(v); }, // on_completed [](){}); // Ensure we only subscribe once to 'published' when its in a diamond. CHECK(published_on_next_count == 4); std::vector<long> expected_results; expected_results.push_back(0); expected_results.push_back(0); expected_results.push_back(2); expected_results.push_back(100); expected_results.push_back(4); expected_results.push_back(200); expected_results.push_back(6); expected_results.push_back(300); // Ensure left,right is interleaved without being biased towards one side. CHECK(results == expected_results); } } } SCENARIO("publish basic", "[publish][multicast][subject][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 7), on.next(220, 3), on.next(280, 4), on.next(290, 1), on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 13), on.next(430, 2), on.next(450, 9), on.next(520, 11), on.next(560, 20), on.completed(600) }); auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.publish().as_dynamic(); //ys = xs.publish_last().as_dynamic(); }); w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable&){ ys.subscribe(res); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable&){ res.unsubscribe(); }); { rx::composite_subscription connection; w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(400, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(550, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(650, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(800, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(520, 11) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there were 3 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(300, 400), on.subscribe(500, 550), on.subscribe(650, 800) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("publish error", "[publish][error][multicast][subject][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; std::runtime_error ex("publish on_error"); auto xs = sc.make_hot_observable({ on.next(110, 7), on.next(220, 3), on.next(280, 4), on.next(290, 1), on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 13), on.next(430, 2), on.next(450, 9), on.next(520, 11), on.next(560, 20), on.error(600, ex) }); auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.publish().as_dynamic(); }); w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable&){ ys.subscribe(res); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable&){ res.unsubscribe(); }); { rx::composite_subscription connection; w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(400, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(800, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(520, 11), on.next(560, 20), on.error(600, ex) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there were 3 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(300, 400), on.subscribe(500, 600) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("publish basic with initial value", "[publish][multicast][behavior][operators]"){ GIVEN("a test hot observable of longs"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 7), on.next(220, 3), on.next(280, 4), on.next(290, 1), on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 13), on.next(430, 2), on.next(450, 9), on.next(520, 11), on.next(560, 20), on.completed(600) }); auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.publish(1979).as_dynamic(); }); w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable&){ ys.subscribe(res); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable&){ res.unsubscribe(); }); { rx::composite_subscription connection; w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(400, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(550, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(650, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(800, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(200, 1979), on.next(340, 8), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(520, 11) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there were 3 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(300, 400), on.subscribe(500, 550), on.subscribe(650, 800) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } }