#include "../test.h"
#include "rxcpp/operators/rx-reduce.hpp"
SCENARIO("reduce some data with seed", "[reduce][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
int seed = 42;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 0),
on.next(220, 1),
on.next(230, 2),
on.next(240, 3),
on.next(250, 4),
on.completed(260)
});
WHEN("mapped to ints that are one larger"){
auto res = w.start(
[&]() {
return xs
.reduce(seed,
[](int sum, int x) {
return sum + x;
},
[](int sum) {
return sum * 5;
})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("the output stops on completion"){
auto required = rxu::to_vector({
on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
on.completed(260)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 260)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
int seed = 42;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 0),
on.next(220, 1),
on.next(230, 2),
on.next(240, 3),
on.next(250, 4),
on.completed(260)
});
WHEN("mapped to ints that are one larger"){
auto res = w.start(
[&]() {
return xs
.accumulate(seed,
[](int sum, int x) {
return sum + x;
},
[](int sum) {
return sum * 5;
})
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);
THEN("the output stops on completion"){
auto required = rxu::to_vector({
on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
on.completed(260)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 260)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("average some data", "[reduce][average][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<double> d_on;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 3),
on.next(220, 4),
on.next(230, 2),
on.completed(250)
});
WHEN("mapped to ints that are one larger"){
auto res = w.start(
[&]() {
return xs.average();
}
);
THEN("the output stops on completion"){
auto required = rxu::to_vector({
d_on.next(250, 3.0),
d_on.completed(250)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("sum some data", "[reduce][sum][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 3),
on.next(220, 4),
on.next(230, 2),
on.completed(250)
});
WHEN("sum is calculated"){
auto res = w.start(
[&]() {
return xs.sum();
}
);
THEN("the output contains the sum of source values"){
auto required = rxu::to_vector({
d_on.next(250, 9),
d_on.completed(250)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("max", "[reduce][max][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 3),
on.next(220, 4),
on.next(230, 2),
on.completed(250)
});
WHEN("max is calculated"){
auto res = w.start(
[&]() {
return xs.max();
}
);
THEN("the output contains the max of source values"){
auto required = rxu::to_vector({
d_on.next(250, 4),
d_on.completed(250)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
// Does not work because calling max() on an empty stream throws an exception
// which will crash when exceptions are disabled.
//
// TODO: the max internal implementation should be rewritten not to throw exceptions.
SCENARIO("max, empty", "[reduce][max][operators][!throws]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
std::runtime_error ex("max on_error");
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.completed(250)
});
WHEN("max is calculated"){
auto res = w.start(
[&]() {
return xs.max();
}
);
THEN("the output contains only error message"){
auto required = rxu::to_vector({
d_on.error(250, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("max, error", "[reduce][max][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
std::runtime_error ex("max on_error from source");
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.error(250, ex)
});
WHEN("max is calculated"){
auto res = w.start(
[&]() {
return xs.max();
}
);
THEN("the output contains only error message"){
auto required = rxu::to_vector({
d_on.error(250, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("min", "[reduce][min][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.next(210, 3),
on.next(220, 4),
on.next(230, 2),
on.completed(250)
});
WHEN("min is calculated"){
auto res = w.start(
[&]() {
return xs.min();
}
);
THEN("the output contains the min of source values"){
auto required = rxu::to_vector({
d_on.next(250, 2),
d_on.completed(250)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
// Does not work with exceptions disabled, min will throw when stream is empty
// and this crashes immediately.
// TODO: min implementation should be rewritten not to throw exceptions.
SCENARIO("min, empty", "[reduce][min][operators][!throws]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
std::runtime_error ex("min on_error");
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.completed(250)
});
WHEN("min is calculated"){
auto res = w.start(
[&]() {
return xs.min();
}
);
THEN("the output contains only error message"){
auto required = rxu::to_vector({
d_on.error(250, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
SCENARIO("min, error", "[reduce][min][operators]"){
GIVEN("a test hot observable of ints"){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<int> d_on;
std::runtime_error ex("min on_error from source");
auto xs = sc.make_hot_observable({
on.next(150, 1),
on.error(250, ex)
});
WHEN("min is calculated"){
auto res = w.start(
[&]() {
return xs.min();
}
);
THEN("the output contains only error message"){
auto required = rxu::to_vector({
d_on.error(250, ex)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}
THEN("there was one subscription and one unsubscription"){
auto required = rxu::to_vector({
on.subscribe(200, 250)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}