#include "../test.h"
#include "rxcpp/operators/rx-combine_latest.hpp"
#include "rxcpp/operators/rx-map.hpp"
#include "rxcpp/operators/rx-take.hpp"
#include "rxcpp/operators/rx-observe_on.hpp"
#include "rxcpp/operators/rx-publish.hpp"
#include "rxcpp/operators/rx-ref_count.hpp"
#include <sstream>
SCENARIO("observe subscription", "[!hide]"){
GIVEN("observable of ints"){
WHEN("subscribe"){
auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>();
auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
auto it = observers->insert(observers->end(), out);
it->add([=](){
observers->erase(it);
});
});
}
}
}
static const int static_subscriptions = 10000;
SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){
const int& subscriptions = static_subscriptions;
GIVEN("a for loop"){
WHEN("subscribe 100K times"){
using namespace std::chrono;
typedef steady_clock clock;
auto sc = rxsc::make_current_thread();
auto w = sc.create_worker();
int runs = 10;
auto loop = [&](const rxsc::schedulable& self) {
int c = 0;
int n = 1;
auto start = clock::now();
for (int i = 0; i < subscriptions; i++) {
rx::observable<>::just(1)
.map([](int i) {
std::stringstream serializer;
serializer << i;
return serializer.str();
})
.map([](const std::string& s) {
int i;
std::stringstream(s) >> i;
return i;
})
.subscribe([&](int){
++c;
});
}
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop subscribe map : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
if (--runs > 0) {
self();
}
};
w.schedule(loop);
}
}
}
SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){
const int& subscriptions = static_subscriptions;
GIVEN("a for loop"){
WHEN("subscribe 100K times"){
using namespace std::chrono;
typedef steady_clock clock;
auto sc = rxsc::make_current_thread();
auto w = sc.create_worker();
int runs = 10;
auto loop = [&](const rxsc::schedulable& self) {
int c = 0;
int n = 1;
auto start = clock::now();
for (int i = 0; i < subscriptions; i++) {
rx::observable<>::just(1)
.combine_latest([](int i, int j) {
return i + j;
}, rx::observable<>::just(2))
.subscribe([&](int){
++c;
});
}
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "loop subscribe combine_latest : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
if (--runs > 0) {
self();
}
};
w.schedule(loop);
}
}
}
SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
GIVEN("range"){
WHEN("synchronized"){
using namespace std::chrono;
typedef steady_clock clock;
auto sc = rxsc::make_current_thread();
auto w = sc.create_worker();
auto es = rx::synchronize_event_loop();
const int values = 10000;
int runs = 10;
auto loop = [&](const rxsc::schedulable& self) {
std::atomic<int> c(0);
int n = 1;
auto liftrequirecompletion = [&](rx::subscriber<int> dest){
auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
std::get<2>(*completionstate).add([=](){
if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
abort();
}
});
// VS2013 deduction issue requires dynamic (type-forgetting)
return rx::make_subscriber<int>(
std::get<2>(*completionstate),
[=](int n){
++std::get<1>(*completionstate);
std::get<2>(*completionstate).on_next(n);
},
[=](rxu::error_ptr){
abort();
},
[=](){
if (std::get<1>(*completionstate) != values) {
abort();
}
std::get<0>(*completionstate) = true;
std::get<2>(*completionstate).on_completed();
}).as_dynamic();
};
auto start = clock::now();
auto ew = es.create_coordinator().get_worker();
std::atomic<int> v(0);
auto s0 = rxs::range(1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
auto s1 = rxs::range(values + 1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
auto s2 = rxs::range((values * 2) + 1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.publish_synchronized(es)
.ref_count()
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
while(v != values * 3 || c != 3);
s0.unsubscribe();
s1.unsubscribe();
s2.unsubscribe();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
if (--runs > 0) {
self();
}
};
w.schedule(loop);
}
}
}
SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
GIVEN("range"){
WHEN("observed on"){
using namespace std::chrono;
typedef steady_clock clock;
auto sc = rxsc::make_current_thread();
auto w = sc.create_worker();
auto es = rx::observe_on_event_loop();
const int values = 10000;
int runs = 10;
auto loop = [&](const rxsc::schedulable& self) {
std::atomic<int> c(0);
int n = 1;
auto liftrequirecompletion = [&](rx::subscriber<int> dest){
auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
std::get<2>(*completionstate).add([=](){
if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
abort();
}
});
// VS2013 deduction issue requires dynamic (type-forgetting)
return rx::make_subscriber<int>(
std::get<2>(*completionstate),
[=](int n){
++std::get<1>(*completionstate);
std::get<2>(*completionstate).on_next(n);
},
[=](rxu::error_ptr){
abort();
},
[=](){
if (std::get<1>(*completionstate) != values) {
abort();
}
std::get<0>(*completionstate) = true;
std::get<2>(*completionstate).on_completed();
}).as_dynamic();
};
auto start = clock::now();
auto ew = es.create_coordinator().get_worker();
std::atomic<int> v(0);
auto s0 = rxs::range(1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
auto s1 = rxs::range(values + 1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
auto s2 = rxs::range((values * 2) + 1, es)
.take(values)
.lift<int>(liftrequirecompletion)
.as_dynamic()
.observe_on(es)
.lift<int>(liftrequirecompletion)
.subscribe(
rx::make_observer_dynamic<int>(
[&](int){
++v;
},
[&](){
++c;
}));
while(v != values * 3 || c != 3);
s0.unsubscribe();
s1.unsubscribe();
s2.unsubscribe();
auto finish = clock::now();
auto msElapsed = duration_cast<milliseconds>(finish-start);
std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
if (--runs > 0) {
self();
}
};
w.schedule(loop);
}
}
}
SCENARIO("subscription traits", "[subscription][traits]"){
GIVEN("given some subscription types"){
auto es = rx::make_subscription();
rx::composite_subscription cs;
WHEN("tested"){
THEN("is_subscription value is true for empty subscription"){
REQUIRE(rx::is_subscription<decltype(es)>::value);
}
THEN("is_subscription value is true for composite_subscription"){
REQUIRE(rx::is_subscription<decltype(cs)>::value);
}
}
}
}
SCENARIO("non-subscription traits", "[subscription][traits]"){
GIVEN("given some non-subscription types"){
auto l = [](){};
int i = 0;
void* v = nullptr;
WHEN("tested"){
THEN("is_subscription value is false for lambda"){
l();
REQUIRE(!rx::is_subscription<decltype(l)>::value);
}
THEN("is_subscription value is false for int"){
i = 0;
REQUIRE(!rx::is_subscription<decltype(i)>::value);
}
THEN("is_subscription value is false for void*"){
v = nullptr;
REQUIRE(!rx::is_subscription<decltype(v)>::value);
}
THEN("is_subscription value is false for void"){
REQUIRE(!rx::is_subscription<void>::value);
}
}
}
}
SCENARIO("subscription static", "[subscription]"){
GIVEN("given a subscription"){
int i=0;
auto s = rx::make_subscription([&i](){++i;});
WHEN("not used"){
THEN("is subscribed"){
REQUIRE(s.is_subscribed());
}
THEN("i is 0"){
REQUIRE(i == 0);
}
}
WHEN("used"){
THEN("is not subscribed when unsubscribed once"){
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
THEN("is not subscribed when unsubscribed twice"){
s.unsubscribe();
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
THEN("i is 1 when unsubscribed once"){
s.unsubscribe();
REQUIRE(i == 1);
}
THEN("i is 1 when unsubscribed twice"){
s.unsubscribe();
s.unsubscribe();
REQUIRE(i == 1);
}
}
}
}
SCENARIO("subscription empty", "[subscription]"){
GIVEN("given an empty subscription"){
auto s = rx::make_subscription();
WHEN("not used"){
THEN("is not subscribed"){
REQUIRE(!s.is_subscribed());
}
}
WHEN("used"){
THEN("is not subscribed when unsubscribed once"){
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
THEN("is not subscribed when unsubscribed twice"){
s.unsubscribe();
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
}
}
}
SCENARIO("subscription composite", "[subscription]"){
GIVEN("given a subscription"){
int i=0;
rx::composite_subscription s;
s.add(rx::make_subscription());
s.add(rx::make_subscription([&i](){++i;}));
s.add([&i](){++i;});
WHEN("not used"){
THEN("is subscribed"){
REQUIRE(s.is_subscribed());
}
THEN("i is 0"){
REQUIRE(i == 0);
}
}
WHEN("used"){
THEN("is not subscribed when unsubscribed once"){
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
THEN("is not subscribed when unsubscribed twice"){
s.unsubscribe();
s.unsubscribe();
REQUIRE(!s.is_subscribed());
}
THEN("i is 2 when unsubscribed once"){
s.unsubscribe();
REQUIRE(i == 2);
}
THEN("i is 2 when unsubscribed twice"){
s.unsubscribe();
s.unsubscribe();
REQUIRE(i == 2);
}
}
}
}