#include "../test.h" namespace detail { template<class Predicate> struct liftfilter { typedef typename std::decay<Predicate>::type test_type; test_type test; liftfilter(test_type t) : test(t) { } template<class Subscriber> struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type> { typedef filter_observer<Subscriber> this_type; typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type; typedef typename base_type::value_type value_type; typedef typename std::decay<Subscriber>::type dest_type; typedef rx::observer<value_type, this_type> observer_type; dest_type dest; test_type test; filter_observer(dest_type d, test_type t) : dest(d) , test(t) { } void on_next(typename dest_type::value_type v) const { bool filtered = false; RXCPP_TRY { filtered = !test(v); } RXCPP_CATCH(...) { dest.on_error(rxu::current_exception()); return; } if (!filtered) { dest.on_next(v); } } void on_error(rxu::error_ptr e) const { dest.on_error(e); } void on_completed() const { dest.on_completed(); } static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) { return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t))); } }; template<class Subscriber> auto operator()(const Subscriber& dest) const -> decltype(filter_observer<Subscriber>::make(dest, test)) { return filter_observer<Subscriber>::make(dest, test); } }; } namespace { template<class Predicate> auto liftfilter(Predicate&& p) -> detail::liftfilter<typename std::decay<Predicate>::type> { return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p)); } bool IsPrime(int x) { if (x < 2) return false; for (int i = 2; i <= x/2; ++i) { if (x % i == 0) return false; } return true; } } SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; long invoked = 0; auto xs = sc.make_hot_observable({ on.next(110, 1), on.next(180, 2), on.next(230, 3), on.next(270, 4), on.next(340, 5), on.next(380, 6), on.next(390, 7), on.next(450, 8), on.next(470, 9), on.next(560, 10), on.next(580, 11), on.completed(600) }); WHEN("filtered to ints that are primes"){ auto res = w.start( [&xs, &invoked]() { return xs .lift<int>(liftfilter([&invoked](int x) { invoked++; return IsPrime(x); })) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); }, 400 ); THEN("the output only contains primes that arrived before disposal"){ auto required = rxu::to_vector({ on.next(230, 3), on.next(340, 5), on.next(390, 7) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 400) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("where was called until disposed"){ REQUIRE(5 == invoked); } } } } SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; long invoked = 0; auto xs = sc.make_hot_observable({ on.next(110, 1), on.next(180, 2), on.next(230, 3), on.next(270, 4), on.next(340, 5), on.next(380, 6), on.next(390, 7), on.next(450, 8), on.next(470, 9), on.next(560, 10), on.next(580, 11), on.completed(600) }); WHEN("filtered to ints that are primes"){ auto res = w.start( [&xs, &invoked]() { return xs >> rxo::lift<int>(liftfilter([&invoked](int x) { invoked++; return IsPrime(x); })) // forget type to workaround lambda deduction bug on msvc 2013 >> rxo::as_dynamic(); }, 400 ); THEN("the output only contains primes that arrived before disposal"){ auto required = rxu::to_vector({ on.next(230, 3), on.next(340, 5), on.next(390, 7) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 400) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("where was called until disposed"){ REQUIRE(5 == invoked); } } } } SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){ GIVEN("a test hot observable of ints"){ auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; long invoked = 0; auto xs = sc.make_hot_observable({ on.next(110, 1), on.next(180, 2), on.next(230, 3), on.next(270, 4), on.next(340, 5), on.next(380, 6), on.next(390, 7), on.next(450, 8), on.next(470, 9), on.next(560, 10), on.next(580, 11), on.completed(600) }); WHEN("filtered to ints that are primes"){ auto res = w.start( [&xs, &invoked]() { auto predicate = [&](int x){ invoked++; return IsPrime(x); }; return xs .lift<int>([=](rx::subscriber<int> dest){ // VS2013 deduction issue requires dynamic (type-forgetting) return rx::make_subscriber<int>( dest, rx::make_observer_dynamic<int>( [=](int n){ bool pass = false; RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());}; if (pass) {dest.on_next(n);} }, [=](rxu::error_ptr e){dest.on_error(e);}, [=](){dest.on_completed();})); }) // forget type to workaround lambda deduction bug on msvc 2013 .as_dynamic(); }, 400 ); THEN("the output only contains primes that arrived before disposal"){ auto required = rxu::to_vector({ on.next(230, 3), on.next(340, 5), on.next(390, 7) }); auto actual = res.get_observer().messages(); REQUIRE(required == actual); } THEN("there was one subscription and one unsubscription"){ auto required = rxu::to_vector({ on.subscribe(200, 400) }); auto actual = xs.subscriptions(); REQUIRE(required == actual); } THEN("where was called until disposed"){ REQUIRE(5 == invoked); } } } }