// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_SUBSCRIPTION_HPP) #define RXCPP_RX_SUBSCRIPTION_HPP #include "rx-includes.hpp" namespace rxcpp { namespace detail { template<class F> struct is_unsubscribe_function { struct not_void {}; template<class CF> static auto check(int) -> decltype((*(CF*)nullptr)()); template<class CF> static not_void check(...); static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; }; } struct tag_subscription {}; struct subscription_base {typedef tag_subscription subscription_tag;}; template<class T> class is_subscription { template<class C> static typename C::subscription_tag* check(int); template<class C> static void check(...); public: static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value; }; template<class Unsubscribe> class static_subscription { typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type; unsubscribe_call_type unsubscribe_call; static_subscription() { } public: static_subscription(const static_subscription& o) : unsubscribe_call(o.unsubscribe_call) { } static_subscription(static_subscription&& o) : unsubscribe_call(std::move(o.unsubscribe_call)) { } static_subscription(unsubscribe_call_type s) : unsubscribe_call(std::move(s)) { } void unsubscribe() const { unsubscribe_call(); } }; class subscription : public subscription_base { class base_subscription_state : public std::enable_shared_from_this<base_subscription_state> { base_subscription_state(); public: explicit base_subscription_state(bool initial) : issubscribed(initial) { } virtual ~base_subscription_state() {} virtual void unsubscribe() { } std::atomic<bool> issubscribed; }; public: typedef std::weak_ptr<base_subscription_state> weak_state_type; private: template<class I> struct subscription_state : public base_subscription_state { typedef rxu::decay_t<I> inner_t; subscription_state(inner_t i) : base_subscription_state(true) , inner(std::move(i)) { } virtual void unsubscribe() { if (issubscribed.exchange(false)) { trace_activity().unsubscribe_enter(*this); inner.unsubscribe(); trace_activity().unsubscribe_return(*this); } } inner_t inner; }; protected: std::shared_ptr<base_subscription_state> state; friend bool operator<(const subscription&, const subscription&); friend bool operator==(const subscription&, const subscription&); private: subscription(weak_state_type w) : state(w.lock()) { if (!state) { std::terminate(); } } explicit subscription(std::shared_ptr<base_subscription_state> s) : state(std::move(s)) { if (!state) { std::terminate(); } } public: subscription() : state(std::make_shared<base_subscription_state>(false)) { if (!state) { std::terminate(); } } template<class U> explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr) : state(std::make_shared<subscription_state<U>>(std::move(u))) { if (!state) { std::terminate(); } } template<class U> explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr) // intentionally slice : state(std::move((*static_cast<subscription*>(&u)).state)) { if (!state) { std::terminate(); } } subscription(const subscription& o) : state(o.state) { if (!state) { std::terminate(); } } subscription(subscription&& o) : state(std::move(o.state)) { if (!state) { std::terminate(); } } subscription& operator=(subscription o) { state = std::move(o.state); return *this; } bool is_subscribed() const { if (!state) { std::terminate(); } return state->issubscribed; } void unsubscribe() const { if (!state) { std::terminate(); } auto keepAlive = state; state->unsubscribe(); } weak_state_type get_weak() { return state; } // Atomically promote weak subscription to strong. // Calls std::terminate if w has already expired. static subscription lock(weak_state_type w) { return subscription(w); } // Atomically try to promote weak subscription to strong. // Returns an empty maybe<> if w has already expired. static rxu::maybe<subscription> maybe_lock(weak_state_type w) { auto strong_subscription = w.lock(); if (!strong_subscription) { return rxu::detail::maybe<subscription>{}; } else { return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}}; } } }; inline bool operator<(const subscription& lhs, const subscription& rhs) { return lhs.state < rhs.state; } inline bool operator==(const subscription& lhs, const subscription& rhs) { return lhs.state == rhs.state; } inline bool operator!=(const subscription& lhs, const subscription& rhs) { return !(lhs == rhs); } inline auto make_subscription() -> subscription { return subscription(); } template<class I> auto make_subscription(I&& i) -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value, subscription>::type { return subscription(std::forward<I>(i)); } template<class Unsubscribe> auto make_subscription(Unsubscribe&& u) -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value, subscription>::type { return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u))); } class composite_subscription; namespace detail { struct tag_composite_subscription_empty {}; class composite_subscription_inner { private: typedef subscription::weak_state_type weak_subscription; struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state> { // invariant: cannot access this data without the lock held. std::set<subscription> subscriptions; // double checked locking: // issubscribed must be loaded again after each lock acquisition. // invariant: // never call subscription::unsubscribe with lock held. std::mutex lock; // invariant: transitions from 'true' to 'false' exactly once, at any time. std::atomic<bool> issubscribed; ~composite_subscription_state() { std::unique_lock<decltype(lock)> guard(lock); subscriptions.clear(); } composite_subscription_state() : issubscribed(true) { } composite_subscription_state(tag_composite_subscription_empty) : issubscribed(false) { } // Atomically add 's' to the set of subscriptions. // // If unsubscribe() has already occurred, this immediately // calls s.unsubscribe(). // // cs.unsubscribe() [must] happens-before s.unsubscribe() // // Due to the un-atomic nature of calling 's.unsubscribe()', // it is possible to observe the unintuitive // add(s)=>s.unsubscribe() prior // to any of the unsubscribe()=>sN.unsubscribe(). inline weak_subscription add(subscription s) { if (!issubscribed) { // load.acq [seq_cst] s.unsubscribe(); } else if (s.is_subscribed()) { std::unique_lock<decltype(lock)> guard(lock); if (!issubscribed) { // load.acq [seq_cst] // unsubscribe was called concurrently. guard.unlock(); // invariant: do not call unsubscribe with lock held. s.unsubscribe(); } else { subscriptions.insert(s); } } return s.get_weak(); } // Atomically remove 'w' from the set of subscriptions. // // This does nothing if 'w' was already previously removed, // or refers to an expired value. inline void remove(weak_subscription w) { if (issubscribed) { // load.acq [seq_cst] rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w); if (maybe_subscription.empty()) { // Do nothing if the subscription has already expired. return; } std::unique_lock<decltype(lock)> guard(lock); // invariant: subscriptions must be accessed under the lock. if (issubscribed) { // load.acq [seq_cst] subscription& s = maybe_subscription.get(); subscriptions.erase(std::move(s)); } // else unsubscribe() was called concurrently; this becomes a no-op. } } // Atomically clear all subscriptions that were observably added // (and not subsequently observably removed). // // Un-atomically call unsubscribe on those subscriptions. // // forall subscriptions in {add(s1),add(s2),...} // - {remove(s3), remove(s4), ...}: // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() // // cs.unsubscribe() observed-before cs.clear ==> do nothing. inline void clear() { if (issubscribed) { // load.acq [seq_cst] std::unique_lock<decltype(lock)> guard(lock); if (!issubscribed) { // load.acq [seq_cst] // unsubscribe was called concurrently. return; } std::set<subscription> v(std::move(subscriptions)); // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { s.unsubscribe(); }); } } // Atomically clear all subscriptions that were observably added // (and not subsequently observably removed). // // Un-atomically call unsubscribe on those subscriptions. // // Switches to an 'unsubscribed' state, all subsequent // adds are immediately unsubscribed. // // cs.unsubscribe() [must] happens-before // cs.add(s) ==> s.unsubscribe() // // forall subscriptions in {add(s1),add(s2),...} // - {remove(s3), remove(s4), ...}: // cs.unsubscribe() || cs.clear() happens before s.unsubscribe() inline void unsubscribe() { if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst] std::unique_lock<decltype(lock)> guard(lock); // is_subscribed can only transition to 'false' once, // does not need an extra atomic access here. std::set<subscription> v(std::move(subscriptions)); // invariant: do not call unsubscribe with lock held. guard.unlock(); std::for_each(v.begin(), v.end(), [](const subscription& s) { s.unsubscribe(); }); } } }; public: typedef std::shared_ptr<composite_subscription_state> shared_state_type; protected: mutable shared_state_type state; public: composite_subscription_inner() : state(std::make_shared<composite_subscription_state>()) { } composite_subscription_inner(tag_composite_subscription_empty et) : state(std::make_shared<composite_subscription_state>(et)) { } composite_subscription_inner(const composite_subscription_inner& o) : state(o.state) { if (!state) { std::terminate(); } } composite_subscription_inner(composite_subscription_inner&& o) : state(std::move(o.state)) { if (!state) { std::terminate(); } } composite_subscription_inner& operator=(composite_subscription_inner o) { state = std::move(o.state); if (!state) { std::terminate(); } return *this; } inline weak_subscription add(subscription s) const { if (!state) { std::terminate(); } return state->add(std::move(s)); } inline void remove(weak_subscription w) const { if (!state) { std::terminate(); } state->remove(std::move(w)); } inline void clear() const { if (!state) { std::terminate(); } state->clear(); } inline void unsubscribe() { if (!state) { std::terminate(); } state->unsubscribe(); } }; inline composite_subscription shared_empty(); } /*! \brief controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe. \ingroup group-core */ class composite_subscription : protected detail::composite_subscription_inner , public subscription { typedef detail::composite_subscription_inner inner_type; public: typedef subscription::weak_state_type weak_subscription; composite_subscription(detail::tag_composite_subscription_empty et) : inner_type(et) , subscription() // use empty base { } public: composite_subscription() : inner_type() , subscription(*static_cast<const inner_type*>(this)) { } composite_subscription(const composite_subscription& o) : inner_type(o) , subscription(static_cast<const subscription&>(o)) { } composite_subscription(composite_subscription&& o) : inner_type(std::move(o)) , subscription(std::move(static_cast<subscription&>(o))) { } composite_subscription& operator=(composite_subscription o) { inner_type::operator=(std::move(o)); subscription::operator=(std::move(*static_cast<subscription*>(&o))); return *this; } static inline composite_subscription empty() { return detail::shared_empty(); } using subscription::is_subscribed; using subscription::unsubscribe; using inner_type::clear; inline weak_subscription add(subscription s) const { if (s == static_cast<const subscription&>(*this)) { // do not nest the same subscription std::terminate(); //return s.get_weak(); } auto that = this->subscription::state.get(); trace_activity().subscription_add_enter(*that, s); auto w = inner_type::add(std::move(s)); trace_activity().subscription_add_return(*that); return w; } template<class F> auto add(F f) const -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { return add(make_subscription(std::move(f))); } inline void remove(weak_subscription w) const { auto that = this->subscription::state.get(); trace_activity().subscription_remove_enter(*that, w); inner_type::remove(w); trace_activity().subscription_remove_return(*that); } }; inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) { return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs); } inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) { return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs); } inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) { return !(lhs == rhs); } namespace detail { inline composite_subscription shared_empty() { static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty()); return shared_empty; } } template<class T> class resource : public subscription_base { public: typedef typename composite_subscription::weak_subscription weak_subscription; resource() : lifetime(composite_subscription()) , value(std::make_shared<rxu::detail::maybe<T>>()) { } explicit resource(T t, composite_subscription cs = composite_subscription()) : lifetime(std::move(cs)) , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t)))) { auto localValue = value; lifetime.add( [localValue](){ localValue->reset(); } ); } T& get() { return value.get()->get(); } composite_subscription& get_subscription() { return lifetime; } bool is_subscribed() const { return lifetime.is_subscribed(); } weak_subscription add(subscription s) const { return lifetime.add(std::move(s)); } template<class F> auto add(F f) const -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { return lifetime.add(make_subscription(std::move(f))); } void remove(weak_subscription w) const { return lifetime.remove(std::move(w)); } void clear() const { return lifetime.clear(); } void unsubscribe() const { return lifetime.unsubscribe(); } protected: composite_subscription lifetime; std::shared_ptr<rxu::detail::maybe<T>> value; }; } #endif