#include "../test.h"
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-merge.hpp>
#include <rxcpp/operators/rx-window_toggle.hpp>

SCENARIO("window toggle, basic", "[window_toggle][operators]"){
    GIVEN("1 hot observable of ints and hot observable of opens."){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;
        const rxsc::test::messages<std::string> o_on;

        auto xs = sc.make_hot_observable({
            on.next(90, 1),
            on.next(180, 2),
            on.next(250, 3),
            on.next(260, 4),
            on.next(310, 5),
            on.next(340, 6),
            on.next(410, 7),
            on.next(420, 8),
            on.next(470, 9),
            on.next(550, 10),
            on.completed(590)
        });

        auto ys = sc.make_hot_observable({
            on.next(255, 50),
            on.next(330, 100),
            on.next(350, 50),
            on.next(400, 90),
            on.completed(900)
        });

        WHEN("ints are split into windows"){
            using namespace std::chrono;

            int wi = 0;

            auto res = w.start(
                [&]() {
                    return xs
                        | rxo::window_toggle(ys, [&](int y){
                            return rx::observable<>::timer(milliseconds(y), so);
                        }, so)
                        | rxo::map([wi](rxcpp::observable<int> w) mutable {
                            auto ti = wi++;
                            return w
                                | rxo::map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
                                // forget type to workaround lambda deduction bug on msvc 2013
                                | rxo::as_dynamic();
                        })
                        | rxo::merge()
                        // forget type to workaround lambda deduction bug on msvc 2013
                        | rxo::as_dynamic();
                }
            );

            THEN("the output contains ints assigned to windows"){
                auto required = rxu::to_vector({
                    o_on.next(261, "0 4"),
                    o_on.next(341, "1 6"),
                    o_on.next(411, "1 7"),
                    o_on.next(411, "3 7"),
                    o_on.next(421, "1 8"),
                    o_on.next(421, "3 8"),
                    o_on.next(471, "3 9"),
                    o_on.completed(591)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the observable"){
                auto required = rxu::to_vector({
                    o_on.subscribe(200, 590)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("window toggle, basic same", "[window_toggle][operators]"){
    GIVEN("1 hot observable of ints and hot observable of opens."){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;
        const rxsc::test::messages<std::string> o_on;

        auto xs = sc.make_hot_observable({
            on.next(90, 1),
            on.next(180, 2),
            on.next(250, 3),
            on.next(260, 4),
            on.next(310, 5),
            on.next(340, 6),
            on.next(410, 7),
            on.next(420, 8),
            on.next(470, 9),
            on.next(550, 10),
            on.completed(590)
        });

        auto ys = sc.make_hot_observable({
            on.next(255, 50),
            on.next(330, 100),
            on.next(350, 50),
            on.next(400, 90),
            on.completed(900)
        });

        WHEN("ints are split into windows"){
            using namespace std::chrono;

            int wi = 0;

            auto res = w.start(
                [&]() {
                    return xs
                        .window_toggle(ys, [&](int){
                            return ys;
                        }, so)
                        .map([wi](rxcpp::observable<int> w) mutable {
                            auto ti = wi++;
                            return w
                                .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();
                        })
                        .merge()
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains ints assigned to windows"){
                auto required = rxu::to_vector({
                    o_on.next(261, "0 4"),
                    o_on.next(311, "0 5"),
                    o_on.next(341, "1 6"),
                    o_on.next(411, "3 7"),
                    o_on.next(421, "3 8"),
                    o_on.next(471, "3 9"),
                    o_on.next(551, "3 10"),
                    o_on.completed(591)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the observable"){
                auto required = rxu::to_vector({
                    o_on.subscribe(200, 590)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("window toggle, error", "[window_toggle][operators]"){
    GIVEN("1 hot observable of ints and hot observable of opens."){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;
        const rxsc::test::messages<std::string> o_on;

        std::runtime_error ex("window_toggle on_error from source");

        auto xs = sc.make_hot_observable({
            on.next(90, 1),
            on.next(180, 2),
            on.next(250, 3),
            on.next(260, 4),
            on.next(310, 5),
            on.next(340, 6),
            on.next(410, 7),
            on.error(420, ex),
            on.next(470, 9),
            on.next(550, 10),
            on.completed(590)
        });

        auto ys = sc.make_hot_observable({
            on.next(255, 50),
            on.next(330, 100),
            on.next(350, 50),
            on.next(400, 90),
            on.completed(900)
        });

        WHEN("ints are split into windows"){
            using namespace std::chrono;

            int wi = 0;

            auto res = w.start(
                [&]() {
                    return xs
                        .window_toggle(ys, [&](int){
                            return ys;
                        }, so)
                        .map([wi](rxcpp::observable<int> w) mutable {
                            auto ti = wi++;
                            return w
                                .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();
                        })
                        .merge()
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                }
            );

            THEN("the output contains ints assigned to windows"){
                auto required = rxu::to_vector({
                    o_on.next(261, "0 4"),
                    o_on.next(311, "0 5"),
                    o_on.next(341, "1 6"),
                    o_on.next(411, "3 7"),
                    o_on.error(421, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the observable"){
                auto required = rxu::to_vector({
                    o_on.subscribe(200, 420)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}

SCENARIO("window toggle, disposed", "[window_toggle][operators]"){
    GIVEN("1 hot observable of ints and hot observable of opens."){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;
        const rxsc::test::messages<std::string> o_on;

        auto xs = sc.make_hot_observable({
            on.next(90, 1),
            on.next(180, 2),
            on.next(250, 3),
            on.next(260, 4),
            on.next(310, 5),
            on.next(340, 6),
            on.next(410, 7),
            on.next(420, 8),
            on.next(470, 9),
            on.next(550, 10),
            on.completed(590)
        });

        auto ys = sc.make_hot_observable({
            on.next(255, 50),
            on.next(330, 100),
            on.next(350, 50),
            on.next(400, 90),
            on.completed(900)
        });

        WHEN("ints are split into windows"){
            using namespace std::chrono;

            int wi = 0;

            auto res = w.start(
                [&]() {
                    return xs
                        .window_toggle(ys, [&](int){
                            return ys;
                        }, so)
                        .map([wi](rxcpp::observable<int> w) mutable {
                            auto ti = wi++;
                            return w
                                .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
                                // forget type to workaround lambda deduction bug on msvc 2013
                                .as_dynamic();
                        })
                        .merge()
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                },
                420
            );

            THEN("the output contains ints assigned to windows"){
                auto required = rxu::to_vector({
                    o_on.next(261, "0 4"),
                    o_on.next(311, "0 5"),
                    o_on.next(341, "1 6"),
                    o_on.next(411, "3 7")
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription to the observable"){
                auto required = rxu::to_vector({
                    o_on.subscribe(200, 420)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }
        }
    }
}