C++程序  |  462行  |  16.56 KB

#include "../test.h"
#include "rxcpp/operators/rx-combine_latest.hpp"
#include "rxcpp/operators/rx-map.hpp"
#include "rxcpp/operators/rx-take.hpp"
#include "rxcpp/operators/rx-observe_on.hpp"
#include "rxcpp/operators/rx-publish.hpp"
#include "rxcpp/operators/rx-ref_count.hpp"

#include <sstream>

SCENARIO("observe subscription", "[!hide]"){
    GIVEN("observable of ints"){
        WHEN("subscribe"){
            auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>();

            auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
                auto it = observers->insert(observers->end(), out);
                it->add([=](){
                    observers->erase(it);
                });
            });

        }
    }
}

static const int static_subscriptions = 10000;

SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 100K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto sc = rxsc::make_current_thread();
            auto w = sc.create_worker();
            int runs = 10;

            auto loop = [&](const rxsc::schedulable& self) {
                int c = 0;
                int n = 1;
                auto start = clock::now();
                for (int i = 0; i < subscriptions; i++) {
                    rx::observable<>::just(1)
                        .map([](int i) {
                            std::stringstream serializer;
                            serializer << i;
                            return serializer.str();
                        })
                        .map([](const std::string& s) {
                            int i;
                            std::stringstream(s) >> i;
                            return i;
                        })
                        .subscribe([&](int){
                            ++c;
                        });
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                std::cout << "loop subscribe map             : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

                if (--runs > 0) {
                    self();
                }
            };

            w.schedule(loop);
        }
    }
}

SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){
    const int& subscriptions = static_subscriptions;
    GIVEN("a for loop"){
        WHEN("subscribe 100K times"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto sc = rxsc::make_current_thread();
            auto w = sc.create_worker();
            int runs = 10;

            auto loop = [&](const rxsc::schedulable& self) {
                int c = 0;
                int n = 1;
                auto start = clock::now();
                for (int i = 0; i < subscriptions; i++) {
                    rx::observable<>::just(1)
                        .combine_latest([](int i, int j) {
                            return i + j;
                        }, rx::observable<>::just(2))
                        .subscribe([&](int){
                            ++c;
                        });
                }
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                std::cout << "loop subscribe combine_latest  : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

                if (--runs > 0) {
                    self();
                }
            };

            w.schedule(loop);
        }
    }
}

SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
    GIVEN("range"){
        WHEN("synchronized"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto sc = rxsc::make_current_thread();
            auto w = sc.create_worker();

            auto es = rx::synchronize_event_loop();

            const int values = 10000;

            int runs = 10;

            auto loop = [&](const rxsc::schedulable& self) {
                std::atomic<int> c(0);
                int n = 1;
                auto liftrequirecompletion = [&](rx::subscriber<int> dest){
                    auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
                    std::get<2>(*completionstate).add([=](){
                        if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
                            abort();
                        }
                    });
                    // VS2013 deduction issue requires dynamic (type-forgetting)
                    return rx::make_subscriber<int>(
                        std::get<2>(*completionstate),
                        [=](int n){
                            ++std::get<1>(*completionstate);
                            std::get<2>(*completionstate).on_next(n);
                        },
                        [=](rxu::error_ptr){
                            abort();
                        },
                        [=](){
                            if (std::get<1>(*completionstate) != values) {
                                abort();
                            }
                            std::get<0>(*completionstate) = true;
                            std::get<2>(*completionstate).on_completed();
                        }).as_dynamic();
                };
                auto start = clock::now();
                auto ew = es.create_coordinator().get_worker();
                std::atomic<int> v(0);
                auto s0 = rxs::range(1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .publish_synchronized(es)
                    .ref_count()
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                auto s1 = rxs::range(values + 1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .publish_synchronized(es)
                    .ref_count()
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                auto s2 = rxs::range((values * 2) + 1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .publish_synchronized(es)
                    .ref_count()
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                while(v != values * 3 || c != 3);
                s0.unsubscribe();
                s1.unsubscribe();
                s2.unsubscribe();
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

                if (--runs > 0) {
                    self();
                }
            };

            w.schedule(loop);
        }
    }
}

SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
    GIVEN("range"){
        WHEN("observed on"){
            using namespace std::chrono;
            typedef steady_clock clock;

            auto sc = rxsc::make_current_thread();
            auto w = sc.create_worker();

            auto es = rx::observe_on_event_loop();

            const int values = 10000;

            int runs = 10;

            auto loop = [&](const rxsc::schedulable& self) {
                std::atomic<int> c(0);
                int n = 1;
                auto liftrequirecompletion = [&](rx::subscriber<int> dest){
                    auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
                    std::get<2>(*completionstate).add([=](){
                        if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
                            abort();
                        }
                    });
                    // VS2013 deduction issue requires dynamic (type-forgetting)
                    return rx::make_subscriber<int>(
                        std::get<2>(*completionstate),
                        [=](int n){
                            ++std::get<1>(*completionstate);
                            std::get<2>(*completionstate).on_next(n);
                        },
                        [=](rxu::error_ptr){
                            abort();
                        },
                        [=](){
                            if (std::get<1>(*completionstate) != values) {
                                abort();
                            }
                            std::get<0>(*completionstate) = true;
                            std::get<2>(*completionstate).on_completed();
                        }).as_dynamic();
                };
                auto start = clock::now();
                auto ew = es.create_coordinator().get_worker();
                std::atomic<int> v(0);
                auto s0 = rxs::range(1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .observe_on(es)
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                auto s1 = rxs::range(values + 1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .observe_on(es)
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                auto s2 = rxs::range((values * 2) + 1, es)
                    .take(values)
                    .lift<int>(liftrequirecompletion)
                    .as_dynamic()
                    .observe_on(es)
                    .lift<int>(liftrequirecompletion)
                    .subscribe(
                        rx::make_observer_dynamic<int>(
                        [&](int){
                            ++v;
                        },
                        [&](){
                            ++c;
                        }));
                while(v != values * 3 || c != 3);
                s0.unsubscribe();
                s1.unsubscribe();
                s2.unsubscribe();
                auto finish = clock::now();
                auto msElapsed = duration_cast<milliseconds>(finish-start);
                std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;

                if (--runs > 0) {
                    self();
                }
            };

            w.schedule(loop);
        }
    }
}

SCENARIO("subscription traits", "[subscription][traits]"){
    GIVEN("given some subscription types"){
        auto es = rx::make_subscription();
        rx::composite_subscription cs;
        WHEN("tested"){
            THEN("is_subscription value is true for empty subscription"){
                REQUIRE(rx::is_subscription<decltype(es)>::value);
            }
            THEN("is_subscription value is true for composite_subscription"){
                REQUIRE(rx::is_subscription<decltype(cs)>::value);
            }
        }
    }
}

SCENARIO("non-subscription traits", "[subscription][traits]"){
    GIVEN("given some non-subscription types"){
        auto l = [](){};
        int i = 0;
        void* v = nullptr;
        WHEN("tested"){
            THEN("is_subscription value is false for lambda"){
                l();
                REQUIRE(!rx::is_subscription<decltype(l)>::value);
            }
            THEN("is_subscription value is false for int"){
                i = 0;
                REQUIRE(!rx::is_subscription<decltype(i)>::value);
            }
            THEN("is_subscription value is false for void*"){
                v = nullptr;
                REQUIRE(!rx::is_subscription<decltype(v)>::value);
            }
            THEN("is_subscription value is false for void"){
                REQUIRE(!rx::is_subscription<void>::value);
            }
        }
    }
}

SCENARIO("subscription static", "[subscription]"){
    GIVEN("given a subscription"){
        int i=0;
        auto s = rx::make_subscription([&i](){++i;});
        WHEN("not used"){
            THEN("is subscribed"){
                REQUIRE(s.is_subscribed());
            }
            THEN("i is 0"){
                REQUIRE(i == 0);
            }
        }
        WHEN("used"){
            THEN("is not subscribed when unsubscribed once"){
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
            THEN("is not subscribed when unsubscribed twice"){
                s.unsubscribe();
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
            THEN("i is 1 when unsubscribed once"){
                s.unsubscribe();
                REQUIRE(i == 1);
            }
            THEN("i is 1 when unsubscribed twice"){
                s.unsubscribe();
                s.unsubscribe();
                REQUIRE(i == 1);
            }
        }
    }
}

SCENARIO("subscription empty", "[subscription]"){
    GIVEN("given an empty subscription"){
        auto s = rx::make_subscription();
        WHEN("not used"){
            THEN("is not subscribed"){
                REQUIRE(!s.is_subscribed());
            }
        }
        WHEN("used"){
            THEN("is not subscribed when unsubscribed once"){
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
            THEN("is not subscribed when unsubscribed twice"){
                s.unsubscribe();
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
        }
    }
}

SCENARIO("subscription composite", "[subscription]"){
    GIVEN("given a subscription"){
        int i=0;
        rx::composite_subscription s;
        s.add(rx::make_subscription());
        s.add(rx::make_subscription([&i](){++i;}));
        s.add([&i](){++i;});
        WHEN("not used"){
            THEN("is subscribed"){
                REQUIRE(s.is_subscribed());
            }
            THEN("i is 0"){
                REQUIRE(i == 0);
            }
        }
        WHEN("used"){
            THEN("is not subscribed when unsubscribed once"){
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
            THEN("is not subscribed when unsubscribed twice"){
                s.unsubscribe();
                s.unsubscribe();
                REQUIRE(!s.is_subscribed());
            }
            THEN("i is 2 when unsubscribed once"){
                s.unsubscribe();
                REQUIRE(i == 2);
            }
            THEN("i is 2 when unsubscribed twice"){
                s.unsubscribe();
                s.unsubscribe();
                REQUIRE(i == 2);
            }
        }
    }
}