#include "../test.h" #include <rxcpp/operators/rx-replay.hpp> SCENARIO("replay basic", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(600) }); auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.replay().as_dynamic(); }); w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable&){ ys.subscribe(res); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable&){ res.unsubscribe(); }); { rx::composite_subscription connection; w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(400, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(550, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(650, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(800, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(520, 11) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there were 3 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(300, 400), on.subscribe(500, 550), on.subscribe(650, 800) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("replay error", "[replay][error][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; std::runtime_error ex("publish on_error"); auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.error(600, ex) }); auto res = w.make_subscriber<int>(); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.replay().as_dynamic(); }); w.schedule_absolute(rxsc::test::subscribed_time, [&ys, &res](const rxsc::schedulable&){ ys.subscribe(res); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [&res](const rxsc::schedulable&){ res.unsubscribe(); }); { rx::composite_subscription connection; w.schedule_absolute(300, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(400, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } { rx::composite_subscription connection; w.schedule_absolute(500, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(800, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); } w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(520, 11), on.next(560, 12), on.error(600, ex) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there were 2 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(300, 400), on.subscribe(500, 600) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("replay multiple subscriptions", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ // Create connectable observable w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.replay().as_dynamic(); }); // Manage connection rx::composite_subscription connection; w.schedule_absolute(rxsc::test::subscribed_time, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); // Subscribe before the first item emitted auto res1 = w.make_subscriber<int>(); w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); // Subscribe in the middle of emitting auto res2 = w.make_subscriber<int>(); w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); // Subscribe after the last item emitted auto res3 = w.make_subscriber<int>(); w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res1.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(400, 1), on.next(400, 2), on.next(400, 3), on.next(400, 4), on.next(400, 5), on.next(400, 6), on.next(400, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res2.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(600, 1), on.next(600, 2), on.next(600, 3), on.next(600, 4), on.next(600, 5), on.next(600, 6), on.next(600, 7), on.next(600, 8), on.next(600, 9), on.next(600, 10), on.next(600, 11), on.next(600, 12), on.completed(650) }); auto actual = res3.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 650) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("replay multiple subscriptions with count", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ // Create connectable observable w.schedule_absolute(rxsc::test::created_time, [&ys, &xs](const rxsc::schedulable&){ ys = xs.replay(3).as_dynamic(); }); // Manage connection rx::composite_subscription connection; w.schedule_absolute(rxsc::test::subscribed_time, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); // Subscribe before the first item emitted auto res1 = w.make_subscriber<int>(); w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); // Subscribe in the middle of emitting auto res2 = w.make_subscriber<int>(); w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); // Subscribe after the last item emitted auto res3 = w.make_subscriber<int>(); w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(220, 1), on.next(280, 2), on.next(290, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res1.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(400, 5), on.next(400, 6), on.next(400, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res2.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(600, 10), on.next(600, 11), on.next(600, 12), on.completed(650) }); auto actual = res3.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 650) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("replay multiple subscriptions with time", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); auto so = rx::identity_one_worker(sc); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(240, 2), on.next(260, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ using namespace std::chrono; // Create connectable observable w.schedule_absolute(rxsc::test::created_time, [&](const rxsc::schedulable&){ ys = xs.replay(milliseconds(100), so).as_dynamic(); }); // Manage connection rx::composite_subscription connection; w.schedule_absolute(rxsc::test::subscribed_time, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); // Subscribe before the first item emitted auto res1 = w.make_subscriber<int>(); w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); // Subscribe in the middle of emitting auto res2 = w.make_subscriber<int>(); w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); // Subscribe after the last item emitted auto res3 = w.make_subscriber<int>(); w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(220, 1), on.next(240, 2), on.next(260, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res1.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(400, 4), on.next(400, 5), on.next(400, 6), on.next(400, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res2.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(600, 11), on.next(600, 12), on.completed(650) }); auto actual = res3.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 650) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } } SCENARIO("replay multiple subscriptions with count and time", "[replay][multicast][subject][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); auto so = rx::identity_one_worker(sc); const rxsc::test::messages<int> on; auto xs = sc.make_hot_observable({ on.next(110, 0), on.next(220, 1), on.next(240, 2), on.next(260, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); rx::connectable_observable<int> ys; WHEN("subscribed and then connected"){ using namespace std::chrono; // Create connectable observable w.schedule_absolute(rxsc::test::created_time, [&](const rxsc::schedulable&){ ys = xs.replay(3, milliseconds(100), so).as_dynamic(); }); // Manage connection rx::composite_subscription connection; w.schedule_absolute(rxsc::test::subscribed_time, [connection, &ys](const rxsc::schedulable&){ ys.connect(connection); }); w.schedule_absolute(rxsc::test::unsubscribed_time, [connection](const rxsc::schedulable&){ connection.unsubscribe(); }); // Subscribe before the first item emitted auto res1 = w.make_subscriber<int>(); w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); // Subscribe in the middle of emitting auto res2 = w.make_subscriber<int>(); w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); // Subscribe after the last item emitted auto res3 = w.make_subscriber<int>(); w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); w.start(); THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(220, 1), on.next(240, 2), on.next(260, 3), on.next(340, 4), on.next(360, 5), on.next(370, 6), on.next(390, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res1.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(400, 5), on.next(400, 6), on.next(400, 7), on.next(410, 8), on.next(430, 9), on.next(450, 10), on.next(520, 11), on.next(560, 12), on.completed(650) }); auto actual = res2.get_observer().messages(); REQUIRE(required == actual); } THEN("the output only contains items sent while subscribed"){ auto required = rxu::to_vector({ on.next(600, 11), on.next(600, 12), on.completed(650) }); auto actual = res3.get_observer().messages(); REQUIRE(required == actual); } THEN("there was 1 subscription/unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 650) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } } } }