// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP) #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP #include "rx-includes.hpp" namespace rxcpp { namespace detail { template<class T> struct has_on_connect { struct not_void {}; template<class CT> static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription())); template<class CT> static not_void check(...); typedef decltype(check<T>(0)) detail_result; static const bool value = std::is_same<detail_result, void>::value; }; } template<class T> class dynamic_connectable_observable : public dynamic_observable<T> { struct state_type : public std::enable_shared_from_this<state_type> { typedef std::function<void(composite_subscription)> onconnect_type; onconnect_type on_connect; }; std::shared_ptr<state_type> state; template<class U> void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) { state = o.state; } template<class U> void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) { state = std::move(o.state); } template<class SO> void construct(SO&& source, rxs::tag_source&&) { auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source)); state->on_connect = [so](composite_subscription cs) mutable { so->on_connect(std::move(cs)); }; } public: typedef tag_dynamic_observable dynamic_observable_tag; dynamic_connectable_observable() { } template<class SOF> explicit dynamic_connectable_observable(SOF sof) : dynamic_observable<T>(sof) , state(std::make_shared<state_type>()) { construct(std::move(sof), typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type()); } template<class SF, class CF> dynamic_connectable_observable(SF&& sf, CF&& cf) : dynamic_observable<T>(std::forward<SF>(sf)) , state(std::make_shared<state_type>()) { state->on_connect = std::forward<CF>(cf); } using dynamic_observable<T>::on_subscribe; void on_connect(composite_subscription cs) const { state->on_connect(std::move(cs)); } }; template<class T, class Source> connectable_observable<T> make_dynamic_connectable_observable(Source&& s) { return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s))); } /*! \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called. \ingroup group-observable */ template<class T, class SourceOperator> class connectable_observable : public observable<T, SourceOperator> { typedef connectable_observable<T, SourceOperator> this_type; typedef observable<T, SourceOperator> base_type; typedef rxu::decay_t<SourceOperator> source_operator_type; static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)"); public: typedef tag_connectable_observable observable_tag; connectable_observable() { } explicit connectable_observable(const SourceOperator& o) : base_type(o) { } explicit connectable_observable(SourceOperator&& o) : base_type(std::move(o)) { } // implicit conversion between observables of the same value_type template<class SO> connectable_observable(const connectable_observable<T, SO>& o) : base_type(o) {} // implicit conversion between observables of the same value_type template<class SO> connectable_observable(connectable_observable<T, SO>&& o) : base_type(std::move(o)) {} /// /// takes any function that will take this observable and produce a result value. /// this is intended to allow externally defined operators, that use subscribe, /// to be connected into the expression. /// template<class OperatorFactory> auto op(OperatorFactory&& of) const -> decltype(of(*(const this_type*)nullptr)) { return of(*this); static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); } /// /// performs type-forgetting conversion to a new composite_observable /// connectable_observable<T> as_dynamic() { return *this; } composite_subscription connect(composite_subscription cs = composite_subscription()) { base_type::source_operator.on_connect(cs); return cs; } /*! @copydoc rx-ref_count.hpp */ template<class... AN> auto ref_count(AN... an) const /// \cond SHOW_SERVICE_MEMBERS -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-connect_forever.hpp */ template<class... AN> auto connect_forever(AN... an) const /// \cond SHOW_SERVICE_MEMBERS -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...); } }; } // // support range() >> filter() >> subscribe() syntax // '>>' is spelled 'stream' // template<class T, class SourceOperator, class OperatorFactory> auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) -> decltype(source.op(std::forward<OperatorFactory>(of))) { return source.op(std::forward<OperatorFactory>(of)); } // // support range() | filter() | subscribe() syntax // '|' is spelled 'pipe' // template<class T, class SourceOperator, class OperatorFactory> auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of) -> decltype(source.op(std::forward<OperatorFactory>(of))) { return source.op(std::forward<OperatorFactory>(of)); } #endif