// Copyright 2015 The Weave Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "src/notification/xmpp_channel.h" #include <string> #include <base/bind.h> #include <base/strings/string_number_conversions.h> #include <weave/provider/network.h> #include <weave/provider/task_runner.h> #include "src/backoff_entry.h" #include "src/data_encoding.h" #include "src/notification/notification_delegate.h" #include "src/notification/notification_parser.h" #include "src/notification/xml_node.h" #include "src/privet/openssl_utils.h" #include "src/string_utils.h" #include "src/utils.h" namespace weave { namespace { std::string BuildXmppStartStreamCommand() { return "<stream:stream to='clouddevices.gserviceaccount.com' " "xmlns:stream='http://etherx.jabber.org/streams' " "xml:lang='*' version='1.0' xmlns='jabber:client'>"; } std::string BuildXmppAuthenticateCommand(const std::string& account, const std::string& token) { std::vector<uint8_t> credentials; credentials.push_back(0); credentials.insert(credentials.end(), account.begin(), account.end()); credentials.push_back(0); credentials.insert(credentials.end(), token.begin(), token.end()); std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' " "mechanism='X-OAUTH2' auth:service='oauth2' " "auth:allow-non-google-login='true' " "auth:client-uses-full-bind-result='true' " "xmlns:auth='http://www.google.com/talk/protocol/auth'>" + Base64Encode(credentials) + "</auth>"; return msg; } // Backoff policy. // Note: In order to ensure a minimum of 20 seconds between server errors, // we have a 30s +- 10s (33%) jitter initial backoff. const BackoffEntry::Policy kDefaultBackoffPolicy = { // Number of initial errors (in sequence) to ignore before applying // exponential back-off rules. 0, // Initial delay for exponential back-off in ms. 30 * 1000, // 30 seconds. // Factor by which the waiting time will be multiplied. 2, // Fuzzing percentage. ex: 10% will spread requests randomly // between 90%-100% of the calculated time. 0.33, // 33%. // Maximum amount of time we are willing to delay our request in ms. 10 * 60 * 1000, // 10 minutes. // Time to keep an entry from being discarded even when it // has no significant state, -1 to never discard. -1, // Don't use initial delay unless the last request was an error. false, }; // Used for keeping connection alive. const int kRegularPingIntervalSeconds = 60; const int kRegularPingTimeoutSeconds = 30; // Used for diagnostic when connectivity changed. const int kAgressivePingIntervalSeconds = 5; const int kAgressivePingTimeoutSeconds = 10; const int kConnectingTimeoutAfterNetChangeSeconds = 30; } // namespace XmppChannel::XmppChannel(const std::string& account, const std::string& access_token, const std::string& xmpp_endpoint, provider::TaskRunner* task_runner, provider::Network* network) : account_{account}, access_token_{access_token}, xmpp_endpoint_{xmpp_endpoint}, network_{network}, backoff_entry_{&kDefaultBackoffPolicy}, task_runner_{task_runner}, iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} { read_socket_data_.resize(4096); if (network) { network->AddConnectionChangedCallback(base::Bind( &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr())); } } void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) { read_pending_ = false; if (error) return Restart(); std::string msg(read_socket_data_.data(), size); VLOG(2) << "Received XMPP packet: '" << msg << "'"; if (!size) return Restart(); stream_parser_.ParseData(msg); WaitForMessage(); } void XmppChannel::OnStreamStart(const std::string& node_name, std::map<std::string, std::string> attributes) { VLOG(2) << "XMPP stream start: " << node_name; } void XmppChannel::OnStreamEnd(const std::string& node_name) { VLOG(2) << "XMPP stream ended: " << node_name; Stop(); if (IsConnected()) { // If we had a fully-established connection, restart it now. // However, if the connection has never been established yet (e.g. // authorization failed), do not restart right now. Wait till we get // new credentials. task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {}); } else if (delegate_) { delegate_->OnPermanentFailure(); } } void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) { // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback // from expat XML parser and some stanza could cause the XMPP stream to be // reset and the parser to be re-initialized. We don't want to destroy the // parser while it is performing a callback invocation. task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(), base::Passed(std::move(stanza))), {}); } void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) { VLOG(2) << "XMPP stanza received: " << stanza->ToString(); switch (state_) { case XmppState::kConnected: if (stanza->name() == "stream:features") { auto children = stanza->FindChildren("mechanisms/mechanism", false); for (const auto& child : children) { if (child->text() == "X-OAUTH2") { state_ = XmppState::kAuthenticationStarted; SendMessage(BuildXmppAuthenticateCommand(account_, access_token_)); return; } } } break; case XmppState::kAuthenticationStarted: if (stanza->name() == "success") { state_ = XmppState::kStreamRestartedPostAuthentication; RestartXmppStream(); return; } else if (stanza->name() == "failure") { if (stanza->FindFirstChild("not-authorized", false)) { state_ = XmppState::kAuthenticationFailed; return; } } break; case XmppState::kStreamRestartedPostAuthentication: if (stanza->name() == "stream:features" && stanza->FindFirstChild("bind", false)) { state_ = XmppState::kBindSent; iq_stanza_handler_->SendRequest( "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>", base::Bind(&XmppChannel::OnBindCompleted, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); return; } break; default: if (stanza->name() == "message") { HandleMessageStanza(std::move(stanza)); return; } else if (stanza->name() == "iq") { if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) { LOG(ERROR) << "Failed to handle IQ stanza"; CloseStream(); } return; } LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString(); return; } // Something bad happened. Close the stream and start over. LOG(ERROR) << "Error condition occurred handling stanza: " << stanza->ToString() << " in state: " << static_cast<int>(state_); CloseStream(); } void XmppChannel::CloseStream() { SendMessage("</stream:stream>"); } void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) { if (reply->GetAttributeOrEmpty("type") != "result") { CloseStream(); return; } const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false); if (!jid_node) { LOG(ERROR) << "XMPP Bind response is missing JID"; CloseStream(); return; } jid_ = jid_node->text(); state_ = XmppState::kSessionStarted; iq_stanza_handler_->SendRequest( "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>", base::Bind(&XmppChannel::OnSessionEstablished, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); } void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) { if (reply->GetAttributeOrEmpty("type") != "result") { CloseStream(); return; } state_ = XmppState::kSubscribeStarted; std::string body = "<subscribe xmlns='google:push'>" "<item channel='cloud_devices' from=''/></subscribe>"; iq_stanza_handler_->SendRequest( "set", "", account_, body, base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); } void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) { if (reply->GetAttributeOrEmpty("type") != "result") { CloseStream(); return; } state_ = XmppState::kSubscribed; if (delegate_) delegate_->OnConnected(GetName()); } void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) { const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true); if (!node) { LOG(WARNING) << "XMPP message stanza is missing <push:data> element"; return; } std::string data = node->text(); std::string json_data; if (!Base64Decode(data, &json_data)) { LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data; return; } VLOG(2) << "XMPP push notification data: " << json_data; auto json_dict = LoadJsonDict(json_data, nullptr); if (json_dict && delegate_) ParseNotificationJson(*json_dict, delegate_, GetName()); } void XmppChannel::CreateSslSocket() { CHECK(!stream_); state_ = XmppState::kConnecting; LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_; std::pair<std::string, std::string> host_port = SplitAtFirst(xmpp_endpoint_, ":", true); CHECK(!host_port.first.empty()); CHECK(!host_port.second.empty()); uint32_t port = 0; CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_; network_->OpenSslSocket(host_port.first, port, base::Bind(&XmppChannel::OnSslSocketReady, task_ptr_factory_.GetWeakPtr())); } void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream, ErrorPtr error) { if (error) { LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection"; backoff_entry_.InformOfRequest(false); LOG(INFO) << "Delaying connection to XMPP server for " << backoff_entry_.GetTimeUntilRelease(); return task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket, task_ptr_factory_.GetWeakPtr()), backoff_entry_.GetTimeUntilRelease()); } CHECK(XmppState::kConnecting == state_); backoff_entry_.InformOfRequest(true); stream_ = std::move(stream); state_ = XmppState::kConnected; RestartXmppStream(); ScheduleRegularPing(); } void XmppChannel::SendMessage(const std::string& message) { CHECK(stream_) << "No XMPP socket stream available"; if (write_pending_) { queued_write_data_ += message; return; } write_socket_data_ = queued_write_data_ + message; queued_write_data_.clear(); VLOG(2) << "Sending XMPP message: " << message; write_pending_ = true; stream_->Write( write_socket_data_.data(), write_socket_data_.size(), base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr())); } void XmppChannel::OnMessageSent(ErrorPtr error) { write_pending_ = false; if (error) return Restart(); if (queued_write_data_.empty()) { WaitForMessage(); } else { SendMessage(std::string{}); } } void XmppChannel::WaitForMessage() { if (read_pending_ || !stream_) return; read_pending_ = true; stream_->Read( read_socket_data_.data(), read_socket_data_.size(), base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr())); } std::string XmppChannel::GetName() const { return "xmpp"; } bool XmppChannel::IsConnected() const { return state_ == XmppState::kSubscribed; } void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) { // No extra parameters needed for XMPP. } void XmppChannel::Restart() { LOG(INFO) << "Restarting XMPP"; Stop(); Start(delegate_); } void XmppChannel::Start(NotificationDelegate* delegate) { CHECK(state_ == XmppState::kNotStarted); delegate_ = delegate; CreateSslSocket(); } void XmppChannel::Stop() { if (IsConnected() && delegate_) delegate_->OnDisconnected(); task_ptr_factory_.InvalidateWeakPtrs(); ping_ptr_factory_.InvalidateWeakPtrs(); stream_.reset(); state_ = XmppState::kNotStarted; } void XmppChannel::RestartXmppStream() { stream_parser_.Reset(); stream_->CancelPendingOperations(); read_pending_ = false; write_pending_ = false; SendMessage(BuildXmppStartStreamCommand()); } void XmppChannel::SchedulePing(base::TimeDelta interval, base::TimeDelta timeout) { VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout; ping_ptr_factory_.InvalidateWeakPtrs(); task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::PingServer, ping_ptr_factory_.GetWeakPtr(), timeout), interval); } void XmppChannel::ScheduleRegularPing() { SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds), base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds)); } void XmppChannel::ScheduleFastPing() { SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds), base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds)); } void XmppChannel::PingServer(base::TimeDelta timeout) { VLOG(1) << "Sending XMPP ping"; if (!IsConnected()) { LOG(WARNING) << "XMPP channel is not connected"; Restart(); return; } // Send an XMPP Ping request as defined in XEP-0199 extension: // http://xmpp.org/extensions/xep-0199.html iq_stanza_handler_->SendRequestWithCustomTimeout( "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout, base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(), base::Time::Now()), base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(), base::Time::Now())); } void XmppChannel::OnPingResponse(base::Time sent_time, std::unique_ptr<XmlNode> reply) { VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time); // Ping response received from server. Everything seems to be in order. // Reschedule with default intervals. ScheduleRegularPing(); } void XmppChannel::OnPingTimeout(base::Time sent_time) { LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after " << (base::Time::Now() - sent_time); Restart(); } void XmppChannel::OnConnectivityChanged() { if (state_ == XmppState::kNotStarted) return; if (state_ == XmppState::kConnecting && backoff_entry_.GetTimeUntilRelease() < base::TimeDelta::FromSeconds( kConnectingTimeoutAfterNetChangeSeconds)) { VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease(); return; } ScheduleFastPing(); } } // namespace weave