// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_GROUPED_OBSERVABLE_HPP)
#define RXCPP_RX_GROUPED_OBSERVABLE_HPP

#include "rx-includes.hpp"

namespace rxcpp {

namespace detail {

template<class K, class Source>
struct has_on_get_key_for
{
    struct not_void {};
    template<class CS>
    static auto check(int) -> decltype((*(CS*)nullptr).on_get_key());
    template<class CS>
    static not_void check(...);

    typedef decltype(check<Source>(0)) detail_result;
    static const bool value = std::is_same<detail_result, rxu::decay_t<K>>::value;
};

}

template<class K, class T>
class dynamic_grouped_observable
    : public dynamic_observable<T>
{
public:
    typedef rxu::decay_t<K> key_type;
    typedef tag_dynamic_grouped_observable dynamic_observable_tag;

private:
    struct state_type
        : public std::enable_shared_from_this<state_type>
    {
        typedef std::function<key_type()> ongetkey_type;

        ongetkey_type on_get_key;
    };
    std::shared_ptr<state_type> state;

    template<class U, class V>
    friend bool operator==(const dynamic_grouped_observable<U, V>&, const dynamic_grouped_observable<U, V>&);

    template<class U, class V>
    void construct(const dynamic_grouped_observable<U, V>& o, const tag_dynamic_grouped_observable&) {
        state = o.state;
    }

    template<class U, class V>
    void construct(dynamic_grouped_observable<U, V>&& o, const tag_dynamic_grouped_observable&) {
        state = std::move(o.state);
    }

    template<class SO>
    void construct(SO&& source, const rxs::tag_source&) {
        auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
        state->on_get_key = [so]() mutable {
            return so->on_get_key();
        };
    }

public:

    dynamic_grouped_observable()
    {
    }

    template<class SOF>
    explicit dynamic_grouped_observable(SOF sof)
        : dynamic_observable<T>(sof)
        , state(std::make_shared<state_type>())
    {
        construct(std::move(sof),
                  typename std::conditional<is_dynamic_grouped_observable<SOF>::value, tag_dynamic_grouped_observable, rxs::tag_source>::type());
    }

    template<class SF, class CF>
    dynamic_grouped_observable(SF&& sf, CF&& cf)
        : dynamic_observable<T>(std::forward<SF>(sf))
        , state(std::make_shared<state_type>())
    {
        state->on_connect = std::forward<CF>(cf);
    }

    using dynamic_observable<T>::on_subscribe;

    key_type on_get_key() const {
        return state->on_get_key();
    }
};

template<class K, class T>
inline bool operator==(const dynamic_grouped_observable<K, T>& lhs, const dynamic_grouped_observable<K, T>& rhs) {
    return lhs.state == rhs.state;
}
template<class K, class T>
inline bool operator!=(const dynamic_grouped_observable<K, T>& lhs, const dynamic_grouped_observable<K, T>& rhs) {
    return !(lhs == rhs);
}

template<class K, class T, class Source>
grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s) {
    return grouped_observable<K, T>(dynamic_grouped_observable<K, T>(std::forward<Source>(s)));
}



/*!
    \brief a source of observables which each emit values from one category specified by the key selector.

    \ingroup group-observable

*/
template<class K, class T, class SourceOperator>
class grouped_observable
    : public observable<T, SourceOperator>
{
    typedef grouped_observable<K, T, SourceOperator> this_type;
    typedef observable<T, SourceOperator> base_type;
    typedef rxu::decay_t<SourceOperator> source_operator_type;

    static_assert(detail::has_on_get_key_for<K, source_operator_type>::value, "inner must have on_get_key method key_type()");

public:
    typedef rxu::decay_t<K> key_type;
    typedef tag_grouped_observable observable_tag;

    grouped_observable()
    {
    }

    explicit grouped_observable(const SourceOperator& o)
        : base_type(o)
    {
    }
    explicit grouped_observable(SourceOperator&& o)
        : base_type(std::move(o))
    {
    }

    // implicit conversion between observables of the same value_type
    template<class SO>
    grouped_observable(const grouped_observable<K, T, SO>& o)
        : base_type(o)
    {}
    // implicit conversion between observables of the same value_type
    template<class SO>
    grouped_observable(grouped_observable<K, T, SO>&& o)
        : base_type(std::move(o))
    {}

    ///
    /// performs type-forgetting conversion to a new grouped_observable
    ///
    grouped_observable<K, T> as_dynamic() const {
        return *this;
    }

    key_type get_key() const {
        return base_type::source_operator.on_get_key();
    }
};


}

//
// support range() >> filter() >> subscribe() syntax
// '>>' is spelled 'stream'
//
template<class K, class T, class SourceOperator, class OperatorFactory>
auto operator >> (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
    -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    return      source.op(std::forward<OperatorFactory>(of));
}

//
// support range() | filter() | subscribe() syntax
// '|' is spelled 'pipe'
//
template<class K, class T, class SourceOperator, class OperatorFactory>
auto operator | (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
    -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    return      source.op(std::forward<OperatorFactory>(of));
}

#endif