C++程序  |  425行  |  14.67 KB

#include "../test.h"

SCENARIO("scope, cold observable", "[scope][sources]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        rxu::detail::maybe<rx::test::testable_observable<int>> xs;

        typedef rx::resource<std::vector<int>> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                return resource(rxu::to_vector({1, 2, 3, 4, 5}));
                            },
                            [&](resource r){
                                auto msg = std::vector<rxsc::test::messages<int>::recorded_type>();
                                int time = 10;
                                auto values = r.get();
                                std::for_each(values.begin(), values.end(), [&](int &v){
                                    msg.push_back(on.next(time, v));
                                    time += 10;
                                });
                                msg.push_back(on.completed(time));
                                xs.reset(sc.make_cold_observable(msg));
                                return xs.get();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output stops on completion"){
                auto required = rxu::to_vector({
                    on.next(210, 1),
                    on.next(220, 2),
                    on.next(230, 3),
                    on.next(240, 4),
                    on.next(250, 5),
                    on.completed(260)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 260)
                });
                auto actual = xs.get().subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, hot observable", "[scope][sources]"){
    GIVEN("a test hot observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        rxu::detail::maybe<rx::test::testable_observable<int>> xs;

        typedef rx::resource<std::vector<int>> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                return resource(rxu::to_vector({1, 2, 3, 4, 5}));
                            },
                            [&](resource r){
                                auto msg = std::vector<rxsc::test::messages<int>::recorded_type>();
                                int time = 210;
                                auto values = r.get();
                                std::for_each(values.begin(), values.end(), [&](int &v){
                                    msg.push_back(on.next(time, v));
                                    time += 10;
                                });
                                msg.push_back(on.completed(time));
                                xs.reset(sc.make_hot_observable(msg));
                                return xs.get();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output stops on completion"){
                auto required = rxu::to_vector({
                    on.next(210, 1),
                    on.next(220, 2),
                    on.next(230, 3),
                    on.next(240, 4),
                    on.next(250, 5),
                    on.completed(260)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 260)
                });
                auto actual = xs.get().subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, complete", "[scope][sources]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        int resource_factory_invoked = 0;
        int observable_factory_invoked = 0;

        rxu::detail::maybe<rx::test::testable_observable<int>> xs;

        typedef rx::resource<int> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                ++resource_factory_invoked;
                                return resource(sc.clock());
                            },
                            [&](resource r){
                                ++observable_factory_invoked;
                                xs.reset(sc.make_cold_observable(rxu::to_vector({
                                    on.next(100, r.get()),
                                    on.completed(200)
                                })));
                                return xs.get();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("Resource factory is used once"){
                REQUIRE(1 == resource_factory_invoked);
            }

            THEN("Observable factory is used once"){
                REQUIRE(1 == observable_factory_invoked);
            }

            THEN("the output stops on completion"){
                auto required = rxu::to_vector({
                    on.next(300, 200),
                    on.completed(400)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 400)
                });
                auto actual = xs.get().subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, error", "[scope][sources]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        std::runtime_error ex("scope on_error from source");

        int resource_factory_invoked = 0;
        int observable_factory_invoked = 0;

        rxu::detail::maybe<rx::test::testable_observable<int>> xs;

        typedef rx::resource<int> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                ++resource_factory_invoked;
                                return resource(sc.clock());
                            },
                            [&](resource r){
                                ++observable_factory_invoked;
                                xs.reset(sc.make_cold_observable(rxu::to_vector({
                                    on.next(100, r.get()),
                                    on.error(200, ex)
                                })));
                                return xs.get();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("Resource factory is used once"){
                REQUIRE(1 == resource_factory_invoked);
            }

            THEN("Observable factory is used once"){
                REQUIRE(1 == observable_factory_invoked);
            }

            THEN("the output stops on error"){
                auto required = rxu::to_vector({
                    on.next(300, 200),
                    on.error(400, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 400)
                });
                auto actual = xs.get().subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, dispose", "[scope][sources]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        int resource_factory_invoked = 0;
        int observable_factory_invoked = 0;

        rxu::detail::maybe<rx::test::testable_observable<int>> xs;

        typedef rx::resource<int> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                ++resource_factory_invoked;
                                return resource(sc.clock());
                            },
                            [&](resource r){
                                ++observable_factory_invoked;
                                xs.reset(sc.make_cold_observable(rxu::to_vector({
                                    on.next(100, r.get()),
                                    on.next(1000, r.get() + 1)
                                })));
                                return xs.get();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("Resource factory is used once"){
                REQUIRE(1 == resource_factory_invoked);
            }

            THEN("Observable factory is used once"){
                REQUIRE(1 == observable_factory_invoked);
            }

            THEN("the output contains resulting ints"){
                auto required = rxu::to_vector({
                    on.next(300, 200)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 1000)
                });
                auto actual = xs.get().subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, throw resource selector", "[scope][sources][!throws]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        std::runtime_error ex("scope on_error from source");

        int resource_factory_invoked = 0;
        int observable_factory_invoked = 0;

        typedef rx::resource<int> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&]() -> resource {
                                ++resource_factory_invoked;
                                rxu::throw_exception(ex);
                                //return resource(sc.clock());
                            },
                            [&](resource){
                                ++observable_factory_invoked;
                                return rx::observable<>::never<int>();
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                    }
            );

            THEN("Resource factory is used once"){
                REQUIRE(1 == resource_factory_invoked);
            }

            THEN("Observable factory is not used"){
                REQUIRE(0 == observable_factory_invoked);
            }

            THEN("the output stops on error"){
                auto required = rxu::to_vector({
                    on.error(200, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("scope, throw resource usage", "[scope][sources][!throws]"){
    GIVEN("a test cold observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        std::runtime_error ex("scope on_error from source");

        int resource_factory_invoked = 0;
        int observable_factory_invoked = 0;

        typedef rx::resource<int> resource;

        WHEN("created by scope"){

            auto res = w.start(
                [&]() {
                    return rx::observable<>::
                        scope(
                            [&](){
                                ++resource_factory_invoked;
                                return resource(sc.clock());
                            },
                            [&](resource) -> rx::observable<int> {
                                ++observable_factory_invoked;
                                rxu::throw_exception(ex);
                            }
                        )
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                    }
            );

            THEN("Resource factory is used once"){
                REQUIRE(1 == resource_factory_invoked);
            }

            THEN("Observable factory is used once"){
                REQUIRE(1 == observable_factory_invoked);
            }

            THEN("the output stops on error"){
                auto required = rxu::to_vector({
                    on.error(200, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }
        }
    }
}