#include "../test.h"

namespace detail {

template<class Predicate>
struct liftfilter
{
    typedef typename std::decay<Predicate>::type test_type;
    test_type test;

    liftfilter(test_type t)
        : test(t)
    {
    }

    template<class Subscriber>
    struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type>
    {
        typedef filter_observer<Subscriber> this_type;
        typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type;
        typedef typename base_type::value_type value_type;
        typedef typename std::decay<Subscriber>::type dest_type;
        typedef rx::observer<value_type, this_type> observer_type;
        dest_type dest;
        test_type test;

        filter_observer(dest_type d, test_type t)
            : dest(d)
            , test(t)
        {
        }
        void on_next(typename dest_type::value_type v) const {
            bool filtered = false;
            RXCPP_TRY {
               filtered = !test(v);
            } RXCPP_CATCH(...) {
                dest.on_error(rxu::current_exception());
                return;
            }
            if (!filtered) {
                dest.on_next(v);
            }
        }
        void on_error(rxu::error_ptr e) const {
            dest.on_error(e);
        }
        void on_completed() const {
            dest.on_completed();
        }

        static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
            return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
        }
    };

    template<class Subscriber>
    auto operator()(const Subscriber& dest) const
        -> decltype(filter_observer<Subscriber>::make(dest, test)) {
        return      filter_observer<Subscriber>::make(dest, test);
    }
};

}

namespace {

template<class Predicate>
auto liftfilter(Predicate&& p)
    ->      detail::liftfilter<typename std::decay<Predicate>::type> {
    return  detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p));
}

bool IsPrime(int x)
{
    if (x < 2) return false;
    for (int i = 2; i <= x/2; ++i)
    {
        if (x % i == 0)
            return false;
    }
    return true;
}

}

SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){
    GIVEN("a test hot observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        long invoked = 0;

        auto xs = sc.make_hot_observable({
            on.next(110, 1),
            on.next(180, 2),
            on.next(230, 3),
            on.next(270, 4),
            on.next(340, 5),
            on.next(380, 6),
            on.next(390, 7),
            on.next(450, 8),
            on.next(470, 9),
            on.next(560, 10),
            on.next(580, 11),
            on.completed(600)
        });

        WHEN("filtered to ints that are primes"){

            auto res = w.start(
                [&xs, &invoked]() {
                    return xs
                        .lift<int>(liftfilter([&invoked](int x) {
                            invoked++;
                            return IsPrime(x);
                        }))
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                },
                400
            );

            THEN("the output only contains primes that arrived before disposal"){
                auto required = rxu::to_vector({
                    on.next(230, 3),
                    on.next(340, 5),
                    on.next(390, 7)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 400)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("where was called until disposed"){
                REQUIRE(5 == invoked);
            }
        }
    }
}

SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){
    GIVEN("a test hot observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        long invoked = 0;

        auto xs = sc.make_hot_observable({
            on.next(110, 1),
            on.next(180, 2),
            on.next(230, 3),
            on.next(270, 4),
            on.next(340, 5),
            on.next(380, 6),
            on.next(390, 7),
            on.next(450, 8),
            on.next(470, 9),
            on.next(560, 10),
            on.next(580, 11),
            on.completed(600)
        });

        WHEN("filtered to ints that are primes"){

            auto res = w.start(
                [&xs, &invoked]() {
                    return xs
                        >> rxo::lift<int>(liftfilter([&invoked](int x) {
                            invoked++;
                            return IsPrime(x);
                        }))
                        // forget type to workaround lambda deduction bug on msvc 2013
                        >> rxo::as_dynamic();
                },
                400
            );

            THEN("the output only contains primes that arrived before disposal"){
                auto required = rxu::to_vector({
                    on.next(230, 3),
                    on.next(340, 5),
                    on.next(390, 7)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 400)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("where was called until disposed"){
                REQUIRE(5 == invoked);
            }
        }
    }
}

SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){
    GIVEN("a test hot observable of ints"){
        auto sc = rxsc::make_test();
        auto w = sc.create_worker();
        const rxsc::test::messages<int> on;

        long invoked = 0;

        auto xs = sc.make_hot_observable({
            on.next(110, 1),
            on.next(180, 2),
            on.next(230, 3),
            on.next(270, 4),
            on.next(340, 5),
            on.next(380, 6),
            on.next(390, 7),
            on.next(450, 8),
            on.next(470, 9),
            on.next(560, 10),
            on.next(580, 11),
            on.completed(600)
        });

        WHEN("filtered to ints that are primes"){

            auto res = w.start(
                [&xs, &invoked]() {
                    auto predicate = [&](int x){
                        invoked++;
                        return IsPrime(x);
                    };
                    return xs
                        .lift<int>([=](rx::subscriber<int> dest){
                            // VS2013 deduction issue requires dynamic (type-forgetting)
                            return rx::make_subscriber<int>(
                                dest,
                                rx::make_observer_dynamic<int>(
                                    [=](int n){
                                        bool pass = false;
                                        RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());};
                                        if (pass) {dest.on_next(n);}
                                    },
                                    [=](rxu::error_ptr e){dest.on_error(e);},
                                    [=](){dest.on_completed();}));
                        })
                        // forget type to workaround lambda deduction bug on msvc 2013
                        .as_dynamic();
                },
                400
            );

            THEN("the output only contains primes that arrived before disposal"){
                auto required = rxu::to_vector({
                    on.next(230, 3),
                    on.next(340, 5),
                    on.next(390, 7)
                });
                auto actual = res.get_observer().messages();
                REQUIRE(required == actual);
            }

            THEN("there was one subscription and one unsubscription"){
                auto required = rxu::to_vector({
                    on.subscribe(200, 400)
                });
                auto actual = xs.subscriptions();
                REQUIRE(required == actual);
            }

            THEN("where was called until disposed"){
                REQUIRE(5 == invoked);
            }
        }
    }
}