// Copyright 2015 The Weave Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/notification/xmpp_channel.h"
#include <string>
#include <base/bind.h>
#include <base/strings/string_number_conversions.h>
#include <weave/provider/network.h>
#include <weave/provider/task_runner.h>
#include "src/backoff_entry.h"
#include "src/data_encoding.h"
#include "src/notification/notification_delegate.h"
#include "src/notification/notification_parser.h"
#include "src/notification/xml_node.h"
#include "src/privet/openssl_utils.h"
#include "src/string_utils.h"
#include "src/utils.h"
namespace weave {
namespace {
std::string BuildXmppStartStreamCommand() {
return "<stream:stream to='clouddevices.gserviceaccount.com' "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xml:lang='*' version='1.0' xmlns='jabber:client'>";
}
std::string BuildXmppAuthenticateCommand(const std::string& account,
const std::string& token) {
std::vector<uint8_t> credentials;
credentials.push_back(0);
credentials.insert(credentials.end(), account.begin(), account.end());
credentials.push_back(0);
credentials.insert(credentials.end(), token.begin(), token.end());
std::string msg =
"<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
"mechanism='X-OAUTH2' auth:service='oauth2' "
"auth:allow-non-google-login='true' "
"auth:client-uses-full-bind-result='true' "
"xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
Base64Encode(credentials) + "</auth>";
return msg;
}
// Backoff policy.
// Note: In order to ensure a minimum of 20 seconds between server errors,
// we have a 30s +- 10s (33%) jitter initial backoff.
const BackoffEntry::Policy kDefaultBackoffPolicy = {
// Number of initial errors (in sequence) to ignore before applying
// exponential back-off rules.
0,
// Initial delay for exponential back-off in ms.
30 * 1000, // 30 seconds.
// Factor by which the waiting time will be multiplied.
2,
// Fuzzing percentage. ex: 10% will spread requests randomly
// between 90%-100% of the calculated time.
0.33, // 33%.
// Maximum amount of time we are willing to delay our request in ms.
10 * 60 * 1000, // 10 minutes.
// Time to keep an entry from being discarded even when it
// has no significant state, -1 to never discard.
-1,
// Don't use initial delay unless the last request was an error.
false,
};
// Used for keeping connection alive.
const int kRegularPingIntervalSeconds = 60;
const int kRegularPingTimeoutSeconds = 30;
// Used for diagnostic when connectivity changed.
const int kAgressivePingIntervalSeconds = 5;
const int kAgressivePingTimeoutSeconds = 10;
const int kConnectingTimeoutAfterNetChangeSeconds = 30;
} // namespace
XmppChannel::XmppChannel(const std::string& account,
const std::string& access_token,
const std::string& xmpp_endpoint,
provider::TaskRunner* task_runner,
provider::Network* network)
: account_{account},
access_token_{access_token},
xmpp_endpoint_{xmpp_endpoint},
network_{network},
backoff_entry_{&kDefaultBackoffPolicy},
task_runner_{task_runner},
iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
read_socket_data_.resize(4096);
if (network) {
network->AddConnectionChangedCallback(base::Bind(
&XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
}
}
void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
read_pending_ = false;
if (error)
return Restart();
std::string msg(read_socket_data_.data(), size);
VLOG(2) << "Received XMPP packet: '" << msg << "'";
if (!size)
return Restart();
stream_parser_.ParseData(msg);
WaitForMessage();
}
void XmppChannel::OnStreamStart(const std::string& node_name,
std::map<std::string, std::string> attributes) {
VLOG(2) << "XMPP stream start: " << node_name;
}
void XmppChannel::OnStreamEnd(const std::string& node_name) {
VLOG(2) << "XMPP stream ended: " << node_name;
Stop();
if (IsConnected()) {
// If we had a fully-established connection, restart it now.
// However, if the connection has never been established yet (e.g.
// authorization failed), do not restart right now. Wait till we get
// new credentials.
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
} else if (delegate_) {
delegate_->OnPermanentFailure();
}
}
void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
// Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
// from expat XML parser and some stanza could cause the XMPP stream to be
// reset and the parser to be re-initialized. We don't want to destroy the
// parser while it is performing a callback invocation.
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
base::Passed(std::move(stanza))),
{});
}
void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
VLOG(2) << "XMPP stanza received: " << stanza->ToString();
switch (state_) {
case XmppState::kConnected:
if (stanza->name() == "stream:features") {
auto children = stanza->FindChildren("mechanisms/mechanism", false);
for (const auto& child : children) {
if (child->text() == "X-OAUTH2") {
state_ = XmppState::kAuthenticationStarted;
SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
return;
}
}
}
break;
case XmppState::kAuthenticationStarted:
if (stanza->name() == "success") {
state_ = XmppState::kStreamRestartedPostAuthentication;
RestartXmppStream();
return;
} else if (stanza->name() == "failure") {
if (stanza->FindFirstChild("not-authorized", false)) {
state_ = XmppState::kAuthenticationFailed;
return;
}
}
break;
case XmppState::kStreamRestartedPostAuthentication:
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
iq_stanza_handler_->SendRequest(
"set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
base::Bind(&XmppChannel::OnBindCompleted,
task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
return;
}
break;
default:
if (stanza->name() == "message") {
HandleMessageStanza(std::move(stanza));
return;
} else if (stanza->name() == "iq") {
if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
LOG(ERROR) << "Failed to handle IQ stanza";
CloseStream();
}
return;
}
LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
return;
}
// Something bad happened. Close the stream and start over.
LOG(ERROR) << "Error condition occurred handling stanza: "
<< stanza->ToString() << " in state: " << static_cast<int>(state_);
CloseStream();
}
void XmppChannel::CloseStream() {
SendMessage("</stream:stream>");
}
void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
if (reply->GetAttributeOrEmpty("type") != "result") {
CloseStream();
return;
}
const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
if (!jid_node) {
LOG(ERROR) << "XMPP Bind response is missing JID";
CloseStream();
return;
}
jid_ = jid_node->text();
state_ = XmppState::kSessionStarted;
iq_stanza_handler_->SendRequest(
"set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
base::Bind(&XmppChannel::OnSessionEstablished,
task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
if (reply->GetAttributeOrEmpty("type") != "result") {
CloseStream();
return;
}
state_ = XmppState::kSubscribeStarted;
std::string body =
"<subscribe xmlns='google:push'>"
"<item channel='cloud_devices' from=''/></subscribe>";
iq_stanza_handler_->SendRequest(
"set", "", account_, body,
base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
if (reply->GetAttributeOrEmpty("type") != "result") {
CloseStream();
return;
}
state_ = XmppState::kSubscribed;
if (delegate_)
delegate_->OnConnected(GetName());
}
void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
if (!node) {
LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
return;
}
std::string data = node->text();
std::string json_data;
if (!Base64Decode(data, &json_data)) {
LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
return;
}
VLOG(2) << "XMPP push notification data: " << json_data;
auto json_dict = LoadJsonDict(json_data, nullptr);
if (json_dict && delegate_)
ParseNotificationJson(*json_dict, delegate_, GetName());
}
void XmppChannel::CreateSslSocket() {
CHECK(!stream_);
state_ = XmppState::kConnecting;
LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_;
std::pair<std::string, std::string> host_port =
SplitAtFirst(xmpp_endpoint_, ":", true);
CHECK(!host_port.first.empty());
CHECK(!host_port.second.empty());
uint32_t port = 0;
CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_;
network_->OpenSslSocket(host_port.first, port,
base::Bind(&XmppChannel::OnSslSocketReady,
task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
ErrorPtr error) {
if (error) {
LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
backoff_entry_.InformOfRequest(false);
LOG(INFO) << "Delaying connection to XMPP server for "
<< backoff_entry_.GetTimeUntilRelease();
return task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
task_ptr_factory_.GetWeakPtr()),
backoff_entry_.GetTimeUntilRelease());
}
CHECK(XmppState::kConnecting == state_);
backoff_entry_.InformOfRequest(true);
stream_ = std::move(stream);
state_ = XmppState::kConnected;
RestartXmppStream();
ScheduleRegularPing();
}
void XmppChannel::SendMessage(const std::string& message) {
CHECK(stream_) << "No XMPP socket stream available";
if (write_pending_) {
queued_write_data_ += message;
return;
}
write_socket_data_ = queued_write_data_ + message;
queued_write_data_.clear();
VLOG(2) << "Sending XMPP message: " << message;
write_pending_ = true;
stream_->Write(
write_socket_data_.data(), write_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnMessageSent(ErrorPtr error) {
write_pending_ = false;
if (error)
return Restart();
if (queued_write_data_.empty()) {
WaitForMessage();
} else {
SendMessage(std::string{});
}
}
void XmppChannel::WaitForMessage() {
if (read_pending_ || !stream_)
return;
read_pending_ = true;
stream_->Read(
read_socket_data_.data(), read_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
}
std::string XmppChannel::GetName() const {
return "xmpp";
}
bool XmppChannel::IsConnected() const {
return state_ == XmppState::kSubscribed;
}
void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
// No extra parameters needed for XMPP.
}
void XmppChannel::Restart() {
LOG(INFO) << "Restarting XMPP";
Stop();
Start(delegate_);
}
void XmppChannel::Start(NotificationDelegate* delegate) {
CHECK(state_ == XmppState::kNotStarted);
delegate_ = delegate;
CreateSslSocket();
}
void XmppChannel::Stop() {
if (IsConnected() && delegate_)
delegate_->OnDisconnected();
task_ptr_factory_.InvalidateWeakPtrs();
ping_ptr_factory_.InvalidateWeakPtrs();
stream_.reset();
state_ = XmppState::kNotStarted;
}
void XmppChannel::RestartXmppStream() {
stream_parser_.Reset();
stream_->CancelPendingOperations();
read_pending_ = false;
write_pending_ = false;
SendMessage(BuildXmppStartStreamCommand());
}
void XmppChannel::SchedulePing(base::TimeDelta interval,
base::TimeDelta timeout) {
VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
ping_ptr_factory_.InvalidateWeakPtrs();
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&XmppChannel::PingServer,
ping_ptr_factory_.GetWeakPtr(), timeout),
interval);
}
void XmppChannel::ScheduleRegularPing() {
SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
}
void XmppChannel::ScheduleFastPing() {
SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
}
void XmppChannel::PingServer(base::TimeDelta timeout) {
VLOG(1) << "Sending XMPP ping";
if (!IsConnected()) {
LOG(WARNING) << "XMPP channel is not connected";
Restart();
return;
}
// Send an XMPP Ping request as defined in XEP-0199 extension:
// http://xmpp.org/extensions/xep-0199.html
iq_stanza_handler_->SendRequestWithCustomTimeout(
"get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
base::Time::Now()),
base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
base::Time::Now()));
}
void XmppChannel::OnPingResponse(base::Time sent_time,
std::unique_ptr<XmlNode> reply) {
VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
// Ping response received from server. Everything seems to be in order.
// Reschedule with default intervals.
ScheduleRegularPing();
}
void XmppChannel::OnPingTimeout(base::Time sent_time) {
LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
<< (base::Time::Now() - sent_time);
Restart();
}
void XmppChannel::OnConnectivityChanged() {
if (state_ == XmppState::kNotStarted)
return;
if (state_ == XmppState::kConnecting &&
backoff_entry_.GetTimeUntilRelease() <
base::TimeDelta::FromSeconds(
kConnectingTimeoutAfterNetChangeSeconds)) {
VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
return;
}
ScheduleFastPing();
}
} // namespace weave