#include "../test.h"
#include <rxcpp/operators/rx-take.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>

const int static_onnextcalls = 100000;

SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){
    const int& onnextcalls = static_onnextcalls;
    GIVEN("a range"){
        WHEN("multicasting a million ints"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto el = rx::observe_on_new_thread();

            for (int n = 0; n < 10; n++)
            {
                std::atomic_bool disposed;
                std::atomic_bool done;
                auto c = std::make_shared<int>(0);

                rx::composite_subscription cs;
                cs.add([&](){
                    if (!done) {abort();}
                    disposed = true;
                });

                auto start = clock::now();
                rxs::range<int>(1)
                    .take(onnextcalls)
                    .observe_on(el)
                    .as_blocking()
                    .subscribe(
                        cs,
                        [c](int){
                           ++(*c);
                        },
                        [&](){
                            done = true;
                        });
                auto expected = onnextcalls;
                REQUIRE(*c == expected);
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl;
            }
        }
    }
}

SCENARIO("observe_on", "[observe][observe_on]"){
    GIVEN("a source"){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
            on.next(150, 1),
            on.next(210, 2),
            on.next(240, 3),
            on.completed(300)
        });

        WHEN("subscribe_on is specified"){

            auto res = w.start(
                [so, xs]() {
                    return xs
                         .observe_on(so);
                }
            );

            THEN("the output contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(211, 2),
                    on.next(241, 3),
                    on.completed(301)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was 1 subscription/unsubscription to the source"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 300)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}

SCENARIO("stream observe_on", "[observe][observe_on]"){
    GIVEN("a source"){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
            on.next(150, 1),
            on.next(210, 2),
            on.next(240, 3),
            on.completed(300)
        });

        WHEN("observe_on is specified"){

            auto res = w.start(
                [so, xs]() {
                    return xs
                         | rxo::observe_on(so);
                }
            );

            THEN("the output contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(211, 2),
                    on.next(241, 3),
                    on.completed(301)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was 1 subscription/unsubscription to the source"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 300)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}

class nocompare {
public:
    int v;
};

SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
    GIVEN("a source"){
        auto sc = rxsc::make_test();
        auto so = rx::observe_on_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<nocompare> in;
        const rxsc::test::messages<int> out;

        auto xs = sc.make_hot_observable({
            in.next(150, nocompare{1}),
            in.next(210, nocompare{2}),
            in.next(240, nocompare{3}),
            in.completed(300)
        });

        WHEN("observe_on is specified"){

            auto res = w.start(
                [so, xs]() {
                    return xs
                         | rxo::observe_on(so)
                         | rxo::map([](nocompare v){ return v.v; })
                         | rxo::as_dynamic();
                }
            );

            THEN("the output contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    out.next(211, 2),
                    out.next(241, 3),
                    out.completed(301)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was 1 subscription/unsubscription to the source"){
                auto required = rxu::to_vector({
                    out.subscribe(200, 300)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}