#include "../test.h"
#include <rxcpp/operators/rx-publish.hpp>
#include <rxcpp/operators/rx-connect_forever.hpp>
#include <rxcpp/operators/rx-ref_count.hpp>
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-merge.hpp>


SCENARIO("publish range", "[!hide][range][subject][publish][subject][operators]"){
    GIVEN("a range"){
        WHEN("published"){
            auto published = rxs::range<int>(0, 10).publish();
            std::cout << "subscribe to published" << std::endl;
            published.subscribe(
            // on_next
                [](int v){std::cout << v << ", ";},
            // on_completed
                [](){std::cout << " done." << std::endl;});
            std::cout << "connect to published" << std::endl;
            published.connect();
        }
        WHEN("ref_count is used"){
            auto published = rxs::range<int>(0, 10).publish().ref_count();
            std::cout << "subscribe to ref_count" << std::endl;
            published.subscribe(
            // on_next
                [](int v){std::cout << v << ", ";},
            // on_completed
                [](){std::cout << " done." << std::endl;});
        }
        WHEN("connect_forever is used"){
            auto published = rxs::range<int>(0, 10).publish().connect_forever();
            std::cout << "subscribe to connect_forever" << std::endl;
            published.subscribe(
            // on_next
                [](int v){std::cout << v << ", ";},
            // on_completed
                [](){std::cout << " done." << std::endl;});
        }
    }
}

SCENARIO("publish ref_count", "[range][subject][publish][ref_count][operators]"){
    GIVEN("a range"){
        WHEN("ref_count is used"){
            auto published = rxs::range<int>(0, 3).publish().ref_count();

            std::vector<int> results;
            published.subscribe(
            // on_next
                [&](int v){
                    results.push_back(v);
                },
            // on_completed
                [](){});

            std::vector<int> expected_results;
            expected_results.push_back(0);
            expected_results.push_back(1);
            expected_results.push_back(2);
            expected_results.push_back(3);

            CHECK(results == expected_results);
        }
        WHEN("ref_count(other) is used"){
            auto published = rxs::range<double>(0, 10).publish();
            auto map_to_int = published.map([](double v) { return (long) v; });

            // Ensures that 'ref_count(other)' has the source value type,
            // not the publisher's value type.
            auto with_ref_count = map_to_int.ref_count(published);

            std::vector<long> results;

            with_ref_count.subscribe(
            // on_next
                [&](long v){
                    results.push_back(v);
                },
            // on_completed
                [](){});

            std::vector<long> expected_results;
            for (long i = 0; i <= 10; ++i) {
              expected_results.push_back(i);
            }
            CHECK(results == expected_results);
        }
        WHEN("ref_count(other) is used in a diamond"){
            auto source = rxs::range<double>(0, 3);

            int published_on_next_count = 0;
            // Ensure we only subscribe once to 'published' when its in a diamond.
            auto next = source.map(
                [&](double v) {
                    published_on_next_count++;
                    return v;
                }
            );
            auto published = next.publish();

            // Ensures that 'x.ref_count(other)' has the 'x' value type, not the other's value
            // type.
            auto map_to_int = published.map([](double v) { return (long) v; });

            auto left = map_to_int.map([](long v) { return v * 2; });
            auto right = map_to_int.map([](long v) { return v * 100; });

            auto merge = left.merge(right);
            auto with_ref_count = merge.ref_count(published);

            std::vector<long> results;

            with_ref_count.subscribe(
            // on_next
                [&](long v){
                    results.push_back(v);
                },
            // on_completed
                [](){});

            // Ensure we only subscribe once to 'published' when its in a diamond.
            CHECK(published_on_next_count == 4);

            std::vector<long> expected_results;
            expected_results.push_back(0);
            expected_results.push_back(0);
            expected_results.push_back(2);
            expected_results.push_back(100);
            expected_results.push_back(4);
            expected_results.push_back(200);
            expected_results.push_back(6);
            expected_results.push_back(300);

            // Ensure left,right is interleaved without being biased towards one side.
            CHECK(results == expected_results);
        }
    }
}

SCENARIO("publish basic", "[publish][multicast][subject][operators]"){
    GIVEN("a test hot observable of longs"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
            on.next(110, 7),
            on.next(220, 3),
            on.next(280, 4),
            on.next(290, 1),
            on.next(340, 8),
            on.next(360, 5),
            on.next(370, 6),
            on.next(390, 7),
            on.next(410, 13),
            on.next(430, 2),
            on.next(450, 9),
            on.next(520, 11),
            on.next(560, 20),
            on.completed(600)
        });

        auto res = w.make_subscriber<int>();

        rx::connectable_observable<int> ys;

        WHEN("subscribed and then connected"){

            w.schedule_absolute(rxsc::test::created_time,
                [&ys, &xs](const rxsc::schedulable&){
                    ys = xs.publish().as_dynamic();
                    //ys = xs.publish_last().as_dynamic();
                });

            w.schedule_absolute(rxsc::test::subscribed_time,
                [&ys, &res](const rxsc::schedulable&){
                ys.subscribe(res);
            });

            w.schedule_absolute(rxsc::test::unsubscribed_time,
                [&res](const rxsc::schedulable&){
                    res.unsubscribe();
                });

            {
                rx::composite_subscription connection;

                w.schedule_absolute(300,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(400,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            {
                rx::composite_subscription connection;

                w.schedule_absolute(500,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(550,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            {
                rx::composite_subscription connection;

                w.schedule_absolute(650,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(800,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            w.start();

            THEN("the output only contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(340, 8),
                    on.next(360, 5),
                    on.next(370, 6),
                    on.next(390, 7),
                    on.next(520, 11)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there were 3 subscription/unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(300, 400),
                    on.subscribe(500, 550),
                    on.subscribe(650, 800)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}


SCENARIO("publish error", "[publish][error][multicast][subject][operators]"){
    GIVEN("a test hot observable of longs"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        std::runtime_error ex("publish on_error");

        auto xs = sc.make_hot_observable({
            on.next(110, 7),
            on.next(220, 3),
            on.next(280, 4),
            on.next(290, 1),
            on.next(340, 8),
            on.next(360, 5),
            on.next(370, 6),
            on.next(390, 7),
            on.next(410, 13),
            on.next(430, 2),
            on.next(450, 9),
            on.next(520, 11),
            on.next(560, 20),
            on.error(600, ex)
        });

        auto res = w.make_subscriber<int>();

        rx::connectable_observable<int> ys;

        WHEN("subscribed and then connected"){

            w.schedule_absolute(rxsc::test::created_time,
                [&ys, &xs](const rxsc::schedulable&){
                    ys = xs.publish().as_dynamic();
                });

            w.schedule_absolute(rxsc::test::subscribed_time,
                [&ys, &res](const rxsc::schedulable&){
                ys.subscribe(res);
            });

            w.schedule_absolute(rxsc::test::unsubscribed_time,
                [&res](const rxsc::schedulable&){
                    res.unsubscribe();
                });

            {
                rx::composite_subscription connection;

                w.schedule_absolute(300,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(400,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            {
                rx::composite_subscription connection;

                w.schedule_absolute(500,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(800,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            w.start();

            THEN("the output only contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(340, 8),
                    on.next(360, 5),
                    on.next(370, 6),
                    on.next(390, 7),
                    on.next(520, 11),
                    on.next(560, 20),
                    on.error(600, ex)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there were 3 subscription/unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(300, 400),
                    on.subscribe(500, 600)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}

SCENARIO("publish basic with initial value", "[publish][multicast][behavior][operators]"){
    GIVEN("a test hot observable of longs"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        auto xs = sc.make_hot_observable({
                on.next(110, 7),
                on.next(220, 3),
                on.next(280, 4),
                on.next(290, 1),
                on.next(340, 8),
                on.next(360, 5),
                on.next(370, 6),
                on.next(390, 7),
                on.next(410, 13),
                on.next(430, 2),
                on.next(450, 9),
                on.next(520, 11),
                on.next(560, 20),
                on.completed(600)
            });

        auto res = w.make_subscriber<int>();

        rx::connectable_observable<int> ys;

        WHEN("subscribed and then connected"){

            w.schedule_absolute(rxsc::test::created_time,
                [&ys, &xs](const rxsc::schedulable&){
                    ys = xs.publish(1979).as_dynamic();
                });

            w.schedule_absolute(rxsc::test::subscribed_time,
                [&ys, &res](const rxsc::schedulable&){
                ys.subscribe(res);
            });

            w.schedule_absolute(rxsc::test::unsubscribed_time,
                [&res](const rxsc::schedulable&){
                    res.unsubscribe();
                });

            {
                rx::composite_subscription connection;

                w.schedule_absolute(300,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(400,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            {
                rx::composite_subscription connection;

                w.schedule_absolute(500,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(550,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            {
                rx::composite_subscription connection;

                w.schedule_absolute(650,
                    [connection, &ys](const rxsc::schedulable&){
                        ys.connect(connection);
                    });
                w.schedule_absolute(800,
                    [connection](const rxsc::schedulable&){
                        connection.unsubscribe();
                    });
            }

            w.start();

            THEN("the output only contains items sent while subscribed"){
                auto required = rxu::to_vector({
                    on.next(200, 1979),
                    on.next(340, 8),
                    on.next(360, 5),
                    on.next(370, 6),
                    on.next(390, 7),
                    on.next(520, 11)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there were 3 subscription/unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(300, 400),
                    on.subscribe(500, 550),
                    on.subscribe(650, 800)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

        }
    }
}