#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-subscribe_on.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>

#include <sstream>

static const int static_subscriptions = 50000;

SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 50K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            int runs = 10;

            for (;runs > 0; --runs) {

                int c = 0;
                int n = 1;
                auto start = clock::now();
                for (int i = 0; i < subscriptions; ++i) {
                    c += rx::observable<>::just(1)
                        .map([](int i) {
                            std::stringstream serializer;
                            serializer << i;
                            return serializer.str();
                        })
                        .map([](const std::string& s) {
                            int i;
                            std::stringstream(s) >> i;
                            return i;
                        })
                        .subscribe_on(rx::observe_on_event_loop())
                        .observe_on(rx::observe_on_event_loop())
                        .as_blocking()
                        .count();
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                REQUIRE(subscriptions == c);
                std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
            }
        }
    }
}

SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 50K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            int runs = 10;

            for (;runs > 0; --runs) {

                int c = 0;
                int n = 1;
                auto start = clock::now();

                for (int i = 0; i < subscriptions; ++i) {
                    c += rx::observable<>::
                        just(1).
                        map([](int i) {
                            std::stringstream serializer;
                            serializer << i;
                            return serializer.str();
                        }).
                        map([](const std::string& s) {
                            int i;
                            std::stringstream(s) >> i;
                            return i;
                        }).
                        subscribe_on(rx::observe_on_event_loop()).
                        as_blocking().
                        count();
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                REQUIRE(subscriptions == c);
                std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
            }
        }
    }
}

SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){
    GIVEN("a source"){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
            on.next(150, 1),
            on.next(210, 2),
            on.next(240, 3),
            on.completed(300)
        });

        WHEN("subscribe_on is specified"){

            auto res = w.start(
                [so, xs]() {
                    return xs
                         .subscribe_on(so);
                }
            );

            THEN("the output contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(210, 2),
                    on.next(240, 3),
                    on.completed(300)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was 1 subscription/unsubscription to the source"){
                auto required = rxu::to_vector({
                    on.subscribe(201, 300)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}

SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){
    GIVEN("a source"){
        auto sc = rxsc::make_test();
        auto so = rx::synchronize_in_one_worker(sc);
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
            on.next(150, 1),
            on.next(210, 2),
            on.next(240, 3),
            on.completed(300)
        });

        WHEN("subscribe_on is specified"){

            auto res = w.start(
                [so, xs]() {
                    return xs
                         | rxo::subscribe_on(so);
                }
            );

            THEN("the output contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(210, 2),
                    on.next(240, 3),
                    on.completed(300)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was 1 subscription/unsubscription to the source"){
                auto required = rxu::to_vector({
                    on.subscribe(201, 300)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}