#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-filter.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-take.hpp>
#include <rxcpp/operators/rx-flat_map.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>

static const int static_tripletCount = 100;

SCENARIO("pythagorian for loops", "[!hide][for][pythagorian][perf]"){
    const int& tripletCount = static_tripletCount;
    GIVEN("a for loop"){
        WHEN("generating pythagorian triplets"){
            using namespace std::chrono;
            typedef steady_clock clock;

            int c = 0;
            int ct = 0;
            int n = 1;
            auto start = clock::now();
            for(int z = 1;; ++z)
            {
                for(int x = 1; x <= z; ++x)
                {
                    for(int y = x; y <= z; ++y)
                    {
                        ++c;
                        if(x*x + y*y == z*z)
                        {
                            ++ct;
                            if(ct == tripletCount)
                                goto done;
                        }
                    }
                }
            }
            done:
            auto finish = clock::now();
            auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
                   duration_cast<milliseconds>(start.time_since_epoch());
            std::cout << "pythagorian for   : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

        }
    }
}

SCENARIO("merge_transform pythagorian ranges", "[!hide][range][merge_transform][pythagorian][perf]"){
    const int& tripletCount = static_tripletCount;
    GIVEN("some ranges"){
        WHEN("generating pythagorian triplets"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto so = rx::identity_immediate();

            int c = 0;
            int ct = 0;
            int n = 1;
            auto start = clock::now();
            auto triples =
                rxs::range(1, so)
                    .merge_transform(
                        [&c, so](int z){
                            return rxs::range(1, z, 1, so)
                                .flat_map(
                                    [&c, so, z](int x){
                                        return rxs::range(x, z, 1, so)
                                            .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;})
                                            .transform([z, x](int y){return std::make_tuple(x, y, z);})
                                            // forget type to workaround lambda deduction bug on msvc 2013
                                            .as_dynamic();},
                                    [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;})
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();},
                        [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;});
            triples
                .take(tripletCount)
                .subscribe(
                    rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){
                        ++ct;
                    }));
            auto finish = clock::now();
            auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
                   duration_cast<milliseconds>(start.time_since_epoch());
            std::cout << "merge pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

        }
    }
}

SCENARIO("synchronize merge_transform pythagorian ranges", "[!hide][range][merge_transform][synchronize][pythagorian][perf]"){
    const int& tripletCount = static_tripletCount;
    GIVEN("some ranges"){
        WHEN("generating pythagorian triplets"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto so = rx::synchronize_event_loop();

            int c = 0;
            int n = 1;
            auto start = clock::now();
            auto triples =
                rxs::range(1, so)
                    .merge_transform(
                        [&c, so](int z){
                            return rxs::range(1, z, 1, so)
                                .merge_transform(
                                    [&c, so, z](int x){
                                        return rxs::range(x, z, 1, so)
                                            .filter([&c, z, x](int y){
                                                ++c;
                                                if (x*x + y*y == z*z) {
                                                    return true;}
                                                else {
                                                    return false;}})
                                            .transform([z, x](int y){return std::make_tuple(x, y, z);})
                                            // forget type to workaround lambda deduction bug on msvc 2013
                                            .as_dynamic();},
                                    [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
                                    so)
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();},
                        [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
                        so);
            int ct = triples
                .take(tripletCount)
                .as_blocking()
                .count();

            auto finish = clock::now();
            auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
                   duration_cast<milliseconds>(start.time_since_epoch());
            std::cout << "merge sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
        }
    }
}

SCENARIO("observe_on merge_transform pythagorian ranges", "[!hide][range][merge_transform][observe_on][pythagorian][perf]"){
    const int& tripletCount = static_tripletCount;
    GIVEN("some ranges"){
        WHEN("generating pythagorian triplets"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto so = rx::observe_on_event_loop();

            int c = 0;
            int n = 1;
            auto start = clock::now();
            auto triples =
                rxs::range(1, so)
                    .merge_transform(
                        [&c, so](int z){
                            return rxs::range(1, z, 1, so)
                                .merge_transform(
                                    [&c, so, z](int x){
                                        return rxs::range(x, z, 1, so)
                                            .filter([&c, z, x](int y){
                                                ++c;
                                                if (x*x + y*y == z*z) {
                                                    return true;}
                                                else {
                                                    return false;}})
                                            .transform([z, x](int y){return std::make_tuple(x, y, z);})
                                            // forget type to workaround lambda deduction bug on msvc 2013
                                            .as_dynamic();},
                                    [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
                                    so)
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();},
                        [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
                        so);
            int ct = triples
                .take(tripletCount)
                .as_blocking()
                .count();

            auto finish = clock::now();
            auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
                   duration_cast<milliseconds>(start.time_since_epoch());
            std::cout << "merge observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
        }
    }
}

SCENARIO("serialize merge_transform pythagorian ranges", "[!hide][range][merge_transform][serialize][pythagorian][perf]"){
    const int& tripletCount = static_tripletCount;
    GIVEN("some ranges"){
        WHEN("generating pythagorian triplets"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto so = rx::serialize_event_loop();

            int c = 0;
            int n = 1;
            auto start = clock::now();
            auto triples =
                rxs::range(1, so)
                    .merge_transform(
                        [&c, so](int z){
                            return rxs::range(1, z, 1, so)
                                .merge_transform(
                                    [&c, so, z](int x){
                                        return rxs::range(x, z, 1, so)
                                            .filter([&c, z, x](int y){
                                                ++c;
                                                if (x*x + y*y == z*z) {
                                                    return true;}
                                                else {
                                                    return false;}})
                                            .transform([z, x](int y){return std::make_tuple(x, y, z);})
                                            // forget type to workaround lambda deduction bug on msvc 2013
                                            .as_dynamic();},
                                    [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;},
                                    so)
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();},
                        [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;},
                        so);
            int ct = triples
                .take(tripletCount)
                .as_blocking()
                .count();

            auto finish = clock::now();
            auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
                   duration_cast<milliseconds>(start.time_since_epoch());
            std::cout << "merge serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
        }
    }
}

SCENARIO("flat_map completes", "[flat_map][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.completed(500)
        });

        auto ys = sc.make_cold_observable({
            s_on.next(50, "foo"),
            s_on.next(100, "bar"),
            s_on.next(150, "baz"),
            s_on.next(200, "qux"),
            s_on.completed(250)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        | rxo::flat_map(
                            [&](int){
                                return ys;},
                            [](int, std::string s){
                                return s;})
                        // forget type to workaround lambda deduction bug on msvc 2013
                        | rxo::as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(800, "qux"),
                    s_on.completed(850)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 700)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.completed(500)
        });

        auto ys = sc.make_cold_observable({
            s_on.next(50, "foo"),
            s_on.next(100, "bar"),
            s_on.next(150, "baz"),
            s_on.next(200, "qux"),
            s_on.completed(250)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        | rxo::merge_transform(
                            [&](int){
                                return ys;},
                            [](int, std::string s){
                                return s;})
                        // forget type to workaround lambda deduction bug on msvc 2013
                        | rxo::as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(800, "qux"),
                    s_on.completed(850)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 700)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }

        WHEN("each int is mapped to the strings with coordinator"){

            auto res = w.start(
                [&]() {
                    return xs
                        .merge_transform(
                            [&](int){
                                return ys;},
                            [](int, std::string s){
                                return s;},
                            rx::identity_current_thread())
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(800, "qux"),
                    s_on.completed(850)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 700)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.next(500, 5),
            i_on.next(700, 0)
        });

        auto ys = sc.make_cold_observable({
            s_on.next(50, "foo"),
            s_on.next(100, "bar"),
            s_on.next(150, "baz"),
            s_on.next(200, "qux"),
            s_on.completed(250)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(750, "foo"),
                    s_on.next(800, "qux"),
                    s_on.next(800, "bar"),
                    s_on.next(850, "baz"),
                    s_on.next(900, "qux"),
                    s_on.next(950, "foo")
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 1000)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850),
                    s_on.subscribe(700, 950),
                    s_on.subscribe(900, 1000)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.completed(500)
        });

        std::runtime_error ex("filter on_error from inner source");

        auto ys = sc.make_cold_observable({
            s_on.next(55, "foo"),
            s_on.next(104, "bar"),
            s_on.next(153, "baz"),
            s_on.next(202, "qux"),
            s_on.error(301, ex)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        .merge_transform([&](int){return ys;}, [](int, std::string s){return s;})
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(355, "foo"),
                    s_on.next(404, "bar"),
                    s_on.next(453, "baz"),
                    s_on.next(455, "foo"),
                    s_on.next(502, "qux"),
                    s_on.next(504, "bar"),
                    s_on.next(553, "baz"),
                    s_on.next(555, "foo"),
                    s_on.error(601, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 601)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 601),
                    s_on.subscribe(400, 601),
                    s_on.subscribe(500, 601),
                    s_on.subscribe(600, 601)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.completed(500)
        });

        auto ys = sc.make_cold_observable({
            s_on.next(50, "foo"),
            s_on.next(100, "bar"),
            s_on.next(150, "baz"),
            s_on.next(200, "qux"),
            s_on.completed(250)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        .merge_transform(
                            [&](int){
                                return ys;})
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(800, "qux"),
                    s_on.completed(850)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 700)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){
    GIVEN("two cold observables. one of ints. one of strings."){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> i_on;
        const rxsc::test::messages<std::string> s_on;

        auto xs = sc.make_cold_observable({
            i_on.next(100, 4),
            i_on.next(200, 2),
            i_on.next(300, 3),
            i_on.next(400, 1),
            i_on.completed(500)
        });

        auto ys = sc.make_cold_observable({
            s_on.next(50, "foo"),
            s_on.next(100, "bar"),
            s_on.next(150, "baz"),
            s_on.next(200, "qux"),
            s_on.completed(250)
        });

        WHEN("each int is mapped to the strings"){

            auto res = w.start(
                [&]() {
                    return xs
                        .merge_transform(
                            [&](int){
                                return ys;},
                            rx::identity_current_thread())
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains strings repeated for each int"){
                auto required = rxu::to_vector({
                    s_on.next(350, "foo"),
                    s_on.next(400, "bar"),
                    s_on.next(450, "baz"),
                    s_on.next(450, "foo"),
                    s_on.next(500, "qux"),
                    s_on.next(500, "bar"),
                    s_on.next(550, "baz"),
                    s_on.next(550, "foo"),
                    s_on.next(600, "qux"),
                    s_on.next(600, "bar"),
                    s_on.next(650, "baz"),
                    s_on.next(650, "foo"),
                    s_on.next(700, "qux"),
                    s_on.next(700, "bar"),
                    s_on.next(750, "baz"),
                    s_on.next(800, "qux"),
                    s_on.completed(850)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the ints"){
                auto required = rxu::to_vector({
                    i_on.subscribe(200, 700)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("there were four subscription and unsubscription to the strings"){
                auto required = rxu::to_vector({
                    s_on.subscribe(300, 550),
                    s_on.subscribe(400, 650),
                    s_on.subscribe(500, 750),
                    s_on.subscribe(600, 850)
                });
                auto actual = ys.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}