#include "../test.h"
SCENARIO("scope, cold observable", "[scope][sources]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
rxu::detail::maybe<rx::test::testable_observable<int>> xs;
typedef rx::resource<std::vector<int>> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
return resource(rxu::to_vector({1, 2, 3, 4, 5}));
},
[&](resource r){
auto msg = std::vector<rxsc::test::messages<int>::recorded_type>();
int time = 10;
auto values = r.get();
std::for_each(values.begin(), values.end(), [&](int &v){
msg.push_back(on.next(time, v));
time += 10;
});
msg.push_back(on.completed(time));
xs.reset(sc.make_cold_observable(msg));
return xs.get();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("the output stops on completion"){
auto required = rxu::to_vector({
on.next(210, 1),
on.next(220, 2),
on.next(230, 3),
on.next(240, 4),
on.next(250, 5),
on.completed(260)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 260)
});
auto actual = xs.get().subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, hot observable", "[scope][sources]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
rxu::detail::maybe<rx::test::testable_observable<int>> xs;
typedef rx::resource<std::vector<int>> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
return resource(rxu::to_vector({1, 2, 3, 4, 5}));
},
[&](resource r){
auto msg = std::vector<rxsc::test::messages<int>::recorded_type>();
int time = 210;
auto values = r.get();
std::for_each(values.begin(), values.end(), [&](int &v){
msg.push_back(on.next(time, v));
time += 10;
});
msg.push_back(on.completed(time));
xs.reset(sc.make_hot_observable(msg));
return xs.get();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("the output stops on completion"){
auto required = rxu::to_vector({
on.next(210, 1),
on.next(220, 2),
on.next(230, 3),
on.next(240, 4),
on.next(250, 5),
on.completed(260)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 260)
});
auto actual = xs.get().subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, complete", "[scope][sources]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
int resource_factory_invoked = 0;
int observable_factory_invoked = 0;
rxu::detail::maybe<rx::test::testable_observable<int>> xs;
typedef rx::resource<int> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
++resource_factory_invoked;
return resource(sc.clock());
},
[&](resource r){
++observable_factory_invoked;
xs.reset(sc.make_cold_observable(rxu::to_vector({
on.next(100, r.get()),
on.completed(200)
})));
return xs.get();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("Resource factory is used once"){
REQUIRE(1 == resource_factory_invoked);
}
THEN("Observable factory is used once"){
REQUIRE(1 == observable_factory_invoked);
}
THEN("the output stops on completion"){
auto required = rxu::to_vector({
on.next(300, 200),
on.completed(400)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 400)
});
auto actual = xs.get().subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, error", "[scope][sources]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
std::runtime_error ex("scope on_error from source");
int resource_factory_invoked = 0;
int observable_factory_invoked = 0;
rxu::detail::maybe<rx::test::testable_observable<int>> xs;
typedef rx::resource<int> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
++resource_factory_invoked;
return resource(sc.clock());
},
[&](resource r){
++observable_factory_invoked;
xs.reset(sc.make_cold_observable(rxu::to_vector({
on.next(100, r.get()),
on.error(200, ex)
})));
return xs.get();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("Resource factory is used once"){
REQUIRE(1 == resource_factory_invoked);
}
THEN("Observable factory is used once"){
REQUIRE(1 == observable_factory_invoked);
}
THEN("the output stops on error"){
auto required = rxu::to_vector({
on.next(300, 200),
on.error(400, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 400)
});
auto actual = xs.get().subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, dispose", "[scope][sources]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
int resource_factory_invoked = 0;
int observable_factory_invoked = 0;
rxu::detail::maybe<rx::test::testable_observable<int>> xs;
typedef rx::resource<int> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
++resource_factory_invoked;
return resource(sc.clock());
},
[&](resource r){
++observable_factory_invoked;
xs.reset(sc.make_cold_observable(rxu::to_vector({
on.next(100, r.get()),
on.next(1000, r.get() + 1)
})));
return xs.get();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("Resource factory is used once"){
REQUIRE(1 == resource_factory_invoked);
}
THEN("Observable factory is used once"){
REQUIRE(1 == observable_factory_invoked);
}
THEN("the output contains resulting ints"){
auto required = rxu::to_vector({
on.next(300, 200)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 1000)
});
auto actual = xs.get().subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, throw resource selector", "[scope][sources][!throws]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
std::runtime_error ex("scope on_error from source");
int resource_factory_invoked = 0;
int observable_factory_invoked = 0;
typedef rx::resource<int> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&]() -> resource {
++resource_factory_invoked;
rxu::throw_exception(ex);
//return resource(sc.clock());
},
[&](resource){
++observable_factory_invoked;
return rx::observable<>::never<int>();
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("Resource factory is used once"){
REQUIRE(1 == resource_factory_invoked);
}
THEN("Observable factory is not used"){
REQUIRE(0 == observable_factory_invoked);
}
THEN("the output stops on error"){
auto required = rxu::to_vector({
on.error(200, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("scope, throw resource usage", "[scope][sources][!throws]"){
GIVEN("a test cold observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
std::runtime_error ex("scope on_error from source");
int resource_factory_invoked = 0;
int observable_factory_invoked = 0;
typedef rx::resource<int> resource;
WHEN("created by scope"){
auto res = w.start(
[&]() {
return rx::observable<>::
scope(
[&](){
++resource_factory_invoked;
return resource(sc.clock());
},
[&](resource) -> rx::observable<int> {
++observable_factory_invoked;
rxu::throw_exception(ex);
}
)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("Resource factory is used once"){
REQUIRE(1 == resource_factory_invoked);
}
THEN("Observable factory is used once"){
REQUIRE(1 == observable_factory_invoked);
}
THEN("the output stops on error"){
auto required = rxu::to_vector({
on.error(200, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
}
}
}