// Copyright 2014 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include <brillo/http/http_transport_curl.h> #include <limits> #include <base/bind.h> #include <base/logging.h> #include <base/message_loop/message_loop.h> #include <brillo/http/http_connection_curl.h> #include <brillo/http/http_request.h> #include <brillo/strings/string_utils.h> namespace { const char kCACertificatePath[] = #ifdef __ANDROID__ "/system/etc/security/cacerts_google"; #else "/usr/share/brillo-ca-certificates"; #endif } // namespace namespace brillo { namespace http { namespace curl { // This is a class that stores connection data on particular CURL socket // and provides file descriptor watcher to monitor read and/or write operations // on the socket's file descriptor. class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher { public: SocketPollData(const std::shared_ptr<CurlInterface>& curl_interface, CURLM* curl_multi_handle, Transport* transport, curl_socket_t socket_fd) : curl_interface_(curl_interface), curl_multi_handle_(curl_multi_handle), transport_(transport), socket_fd_(socket_fd), file_descriptor_watcher_(FROM_HERE) {} // Returns the pointer for the socket-specific file descriptor watcher. base::MessagePumpForIO::FdWatchController* GetWatcher() { return &file_descriptor_watcher_; } private: // Overrides from base::MessagePumpForIO::Watcher. void OnFileCanReadWithoutBlocking(int fd) override { OnSocketReady(fd, CURL_CSELECT_IN); } void OnFileCanWriteWithoutBlocking(int fd) override { OnSocketReady(fd, CURL_CSELECT_OUT); } // Data on the socket is available to be read from or written to. // Notify CURL of the action it needs to take on the socket file descriptor. void OnSocketReady(int fd, int action) { CHECK_EQ(socket_fd_, fd) << "Unexpected socket file descriptor"; int still_running_count = 0; CURLMcode code = curl_interface_->MultiSocketAction( curl_multi_handle_, socket_fd_, action, &still_running_count); CHECK_NE(CURLM_CALL_MULTI_PERFORM, code) << "CURL should no longer return CURLM_CALL_MULTI_PERFORM here"; if (code == CURLM_OK) transport_->ProcessAsyncCurlMessages(); } // The CURL interface to use. std::shared_ptr<CurlInterface> curl_interface_; // CURL multi-handle associated with the transport. CURLM* curl_multi_handle_; // Transport object itself. Transport* transport_; // The socket file descriptor for the connection. curl_socket_t socket_fd_; // File descriptor watcher to notify us of asynchronous I/O on the FD. base::MessagePumpForIO::FdWatchController file_descriptor_watcher_; DISALLOW_COPY_AND_ASSIGN(SocketPollData); }; // The request data associated with an asynchronous operation on a particular // connection. struct Transport::AsyncRequestData { // Success/error callbacks to be invoked at the end of the request. SuccessCallback success_callback; ErrorCallback error_callback; // We store a connection here to make sure the object is alive for // as long as asynchronous operation is running. std::shared_ptr<Connection> connection; // The ID of this request. RequestID request_id; }; Transport::Transport(const std::shared_ptr<CurlInterface>& curl_interface) : curl_interface_{curl_interface} { VLOG(2) << "curl::Transport created"; } Transport::Transport(const std::shared_ptr<CurlInterface>& curl_interface, const std::string& proxy) : curl_interface_{curl_interface}, proxy_{proxy} { VLOG(2) << "curl::Transport created with proxy " << proxy; } Transport::~Transport() { ShutDownAsyncCurl(); VLOG(2) << "curl::Transport destroyed"; } std::shared_ptr<http::Connection> Transport::CreateConnection( const std::string& url, const std::string& method, const HeaderList& headers, const std::string& user_agent, const std::string& referer, brillo::ErrorPtr* error) { std::shared_ptr<http::Connection> connection; CURL* curl_handle = curl_interface_->EasyInit(); if (!curl_handle) { LOG(ERROR) << "Failed to initialize CURL"; brillo::Error::AddTo(error, FROM_HERE, http::kErrorDomain, "curl_init_failed", "Failed to initialize CURL"); return connection; } VLOG(1) << "Sending a " << method << " request to " << url; CURLcode code = curl_interface_->EasySetOptStr(curl_handle, CURLOPT_URL, url); if (code == CURLE_OK) { code = curl_interface_->EasySetOptStr(curl_handle, CURLOPT_CAPATH, kCACertificatePath); } if (code == CURLE_OK) { code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1); } if (code == CURLE_OK) { code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_SSL_VERIFYHOST, 2); } if (code == CURLE_OK && !user_agent.empty()) { code = curl_interface_->EasySetOptStr( curl_handle, CURLOPT_USERAGENT, user_agent); } if (code == CURLE_OK && !referer.empty()) { code = curl_interface_->EasySetOptStr(curl_handle, CURLOPT_REFERER, referer); } if (code == CURLE_OK && !proxy_.empty()) { code = curl_interface_->EasySetOptStr(curl_handle, CURLOPT_PROXY, proxy_); } if (code == CURLE_OK) { int64_t timeout_ms = connection_timeout_.InMillisecondsRoundedUp(); if (timeout_ms > 0 && timeout_ms <= std::numeric_limits<int>::max()) { code = curl_interface_->EasySetOptInt( curl_handle, CURLOPT_TIMEOUT_MS, static_cast<int>(timeout_ms)); } } if (code == CURLE_OK && !ip_address_.empty()) { code = curl_interface_->EasySetOptStr( curl_handle, CURLOPT_INTERFACE, ip_address_.c_str()); } // Setup HTTP request method and optional request body. if (code == CURLE_OK) { if (method == request_type::kGet) { code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_HTTPGET, 1); } else if (method == request_type::kHead) { code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_NOBODY, 1); } else if (method == request_type::kPut) { code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_UPLOAD, 1); } else { // POST and custom request methods code = curl_interface_->EasySetOptInt(curl_handle, CURLOPT_POST, 1); if (code == CURLE_OK) { code = curl_interface_->EasySetOptPtr( curl_handle, CURLOPT_POSTFIELDS, nullptr); } if (code == CURLE_OK && method != request_type::kPost) { code = curl_interface_->EasySetOptStr( curl_handle, CURLOPT_CUSTOMREQUEST, method); } } } if (code != CURLE_OK) { AddEasyCurlError(error, FROM_HERE, code, curl_interface_.get()); curl_interface_->EasyCleanup(curl_handle); return connection; } connection = std::make_shared<http::curl::Connection>( curl_handle, method, curl_interface_, shared_from_this()); if (!connection->SendHeaders(headers, error)) { connection.reset(); } return connection; } void Transport::RunCallbackAsync(const base::Location& from_here, const base::Closure& callback) { base::MessageLoopForIO::current()->task_runner()->PostTask( from_here, callback); } RequestID Transport::StartAsyncTransfer(http::Connection* connection, const SuccessCallback& success_callback, const ErrorCallback& error_callback) { brillo::ErrorPtr error; if (!SetupAsyncCurl(&error)) { RunCallbackAsync( FROM_HERE, base::Bind(error_callback, 0, base::Owned(error.release()))); return 0; } RequestID request_id = ++last_request_id_; auto curl_connection = static_cast<http::curl::Connection*>(connection); std::unique_ptr<AsyncRequestData> request_data{new AsyncRequestData}; // Add the request data to |async_requests_| before adding the CURL handle // in case CURL feels like calling the socket callback synchronously which // will need the data to be in |async_requests_| map already. request_data->success_callback = success_callback; request_data->error_callback = error_callback; request_data->connection = std::static_pointer_cast<Connection>(curl_connection->shared_from_this()); request_data->request_id = request_id; async_requests_.emplace(curl_connection, std::move(request_data)); request_id_map_.emplace(request_id, curl_connection); // Add the connection's CURL handle to the multi-handle. CURLMcode code = curl_interface_->MultiAddHandle( curl_multi_handle_, curl_connection->curl_handle_); if (code != CURLM_OK) { brillo::ErrorPtr error; AddMultiCurlError(&error, FROM_HERE, code, curl_interface_.get()); RunCallbackAsync( FROM_HERE, base::Bind(error_callback, 0, base::Owned(error.release()))); async_requests_.erase(curl_connection); request_id_map_.erase(request_id); return 0; } VLOG(1) << "Started asynchronous HTTP request with ID " << request_id; return request_id; } bool Transport::CancelRequest(RequestID request_id) { auto p = request_id_map_.find(request_id); if (p == request_id_map_.end()) { // The request must have been completed already... // This is not necessarily an error condition, so fail gracefully. LOG(WARNING) << "HTTP request #" << request_id << " not found"; return false; } LOG(INFO) << "Canceling HTTP request #" << request_id; CleanAsyncConnection(p->second); return true; } void Transport::SetDefaultTimeout(base::TimeDelta timeout) { connection_timeout_ = timeout; } void Transport::SetLocalIpAddress(const std::string& ip_address) { ip_address_ = "host!" + ip_address; } void Transport::AddEasyCurlError(brillo::ErrorPtr* error, const base::Location& location, CURLcode code, CurlInterface* curl_interface) { brillo::Error::AddTo(error, location, "curl_easy_error", brillo::string_utils::ToString(code), curl_interface->EasyStrError(code)); } void Transport::AddMultiCurlError(brillo::ErrorPtr* error, const base::Location& location, CURLMcode code, CurlInterface* curl_interface) { brillo::Error::AddTo(error, location, "curl_multi_error", brillo::string_utils::ToString(code), curl_interface->MultiStrError(code)); } bool Transport::SetupAsyncCurl(brillo::ErrorPtr* error) { if (curl_multi_handle_) return true; curl_multi_handle_ = curl_interface_->MultiInit(); if (!curl_multi_handle_) { LOG(ERROR) << "Failed to initialize CURL"; brillo::Error::AddTo(error, FROM_HERE, http::kErrorDomain, "curl_init_failed", "Failed to initialize CURL"); return false; } CURLMcode code = curl_interface_->MultiSetSocketCallback( curl_multi_handle_, &Transport::MultiSocketCallback, this); if (code == CURLM_OK) { code = curl_interface_->MultiSetTimerCallback( curl_multi_handle_, &Transport::MultiTimerCallback, this); } if (code != CURLM_OK) { AddMultiCurlError(error, FROM_HERE, code, curl_interface_.get()); return false; } return true; } void Transport::ShutDownAsyncCurl() { if (!curl_multi_handle_) return; LOG_IF(WARNING, !poll_data_map_.empty()) << "There are pending requests at the time of transport's shutdown"; // Make sure we are not leaking any memory here. for (const auto& pair : poll_data_map_) delete pair.second; poll_data_map_.clear(); curl_interface_->MultiCleanup(curl_multi_handle_); curl_multi_handle_ = nullptr; } int Transport::MultiSocketCallback(CURL* easy, curl_socket_t s, int what, void* userp, void* socketp) { auto transport = static_cast<Transport*>(userp); CHECK(transport) << "Transport must be set for this callback"; auto poll_data = static_cast<SocketPollData*>(socketp); if (!poll_data) { // We haven't attached polling data to this socket yet. Let's do this now. poll_data = new SocketPollData{transport->curl_interface_, transport->curl_multi_handle_, transport, s}; transport->poll_data_map_.emplace(std::make_pair(easy, s), poll_data); transport->curl_interface_->MultiAssign( transport->curl_multi_handle_, s, poll_data); } if (what == CURL_POLL_NONE) { return 0; } else if (what == CURL_POLL_REMOVE) { // Remove the attached data from the socket. transport->curl_interface_->MultiAssign( transport->curl_multi_handle_, s, nullptr); transport->poll_data_map_.erase(std::make_pair(easy, s)); // Make sure we stop watching the socket file descriptor now, before // we schedule the SocketPollData for deletion. poll_data->GetWatcher()->StopWatchingFileDescriptor(); // This method can be called indirectly from SocketPollData::OnSocketReady, // so delay destruction of SocketPollData object till the next loop cycle. base::MessageLoopForIO::current()->task_runner()->DeleteSoon(FROM_HERE, poll_data); return 0; } base::MessagePumpForIO::Mode watch_mode = base::MessagePumpForIO::WATCH_READ; switch (what) { case CURL_POLL_IN: watch_mode = base::MessagePumpForIO::WATCH_READ; break; case CURL_POLL_OUT: watch_mode = base::MessagePumpForIO::WATCH_WRITE; break; case CURL_POLL_INOUT: watch_mode = base::MessagePumpForIO::WATCH_READ_WRITE; break; default: LOG(FATAL) << "Unknown CURL socket action: " << what; break; } // WatchFileDescriptor() can be called with the same controller object // (watcher) to amend the watch mode, however this has cumulative effect. // For example, if we were watching a file descriptor for READ operations // and now call it to watch for WRITE, it will end up watching for both // READ and WRITE. This is not what we want here, so stop watching the // file descriptor on previous controller before starting with a different // mode. if (!poll_data->GetWatcher()->StopWatchingFileDescriptor()) LOG(WARNING) << "Failed to stop watching the previous socket descriptor"; CHECK(base::MessageLoopForIO::current()->WatchFileDescriptor( s, true, watch_mode, poll_data->GetWatcher(), poll_data)) << "Failed to watch the CURL socket."; return 0; } // CURL actually uses "long" types in callback signatures, so we must comply. int Transport::MultiTimerCallback(CURLM* /* multi */, long timeout_ms, // NOLINT(runtime/int) void* userp) { auto transport = static_cast<Transport*>(userp); // Cancel any previous timer callbacks. transport->weak_ptr_factory_for_timer_.InvalidateWeakPtrs(); if (timeout_ms >= 0) { base::MessageLoopForIO::current()->task_runner()->PostDelayedTask( FROM_HERE, base::Bind(&Transport::OnTimer, transport->weak_ptr_factory_for_timer_.GetWeakPtr()), base::TimeDelta::FromMilliseconds(timeout_ms)); } return 0; } void Transport::OnTimer() { if (curl_multi_handle_) { int still_running_count = 0; curl_interface_->MultiSocketAction( curl_multi_handle_, CURL_SOCKET_TIMEOUT, 0, &still_running_count); ProcessAsyncCurlMessages(); } } void Transport::ProcessAsyncCurlMessages() { CURLMsg* msg = nullptr; int msgs_left = 0; while ((msg = curl_interface_->MultiInfoRead(curl_multi_handle_, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { // Async I/O complete for a connection. Invoke the user callbacks. Connection* connection = nullptr; CHECK_EQ(CURLE_OK, curl_interface_->EasyGetInfoPtr( msg->easy_handle, CURLINFO_PRIVATE, reinterpret_cast<void**>(&connection))); CHECK(connection != nullptr); OnTransferComplete(connection, msg->data.result); } } } void Transport::OnTransferComplete(Connection* connection, CURLcode code) { auto p = async_requests_.find(connection); CHECK(p != async_requests_.end()) << "Unknown connection"; AsyncRequestData* request_data = p->second.get(); VLOG(1) << "HTTP request # " << request_data->request_id << " has completed " << (code == CURLE_OK ? "successfully" : "with an error"); if (code != CURLE_OK) { brillo::ErrorPtr error; AddEasyCurlError(&error, FROM_HERE, code, curl_interface_.get()); RunCallbackAsync(FROM_HERE, base::Bind(request_data->error_callback, p->second->request_id, base::Owned(error.release()))); } else { if (connection->GetResponseStatusCode() != status_code::Ok) { LOG(INFO) << "Response: " << connection->GetResponseStatusCode() << " (" << connection->GetResponseStatusText() << ")"; } brillo::ErrorPtr error; // Rewind the response data stream to the beginning so the clients can // read the data back. const auto& stream = request_data->connection->response_data_stream_; if (stream && stream->CanSeek() && !stream->SetPosition(0, &error)) { RunCallbackAsync(FROM_HERE, base::Bind(request_data->error_callback, p->second->request_id, base::Owned(error.release()))); } else { std::unique_ptr<Response> resp{new Response{request_data->connection}}; RunCallbackAsync(FROM_HERE, base::Bind(request_data->success_callback, p->second->request_id, base::Passed(&resp))); } } // In case of an error on CURL side, we would have dispatched the error // callback and we need to clean up the current connection, however the // error callback has no reference to the connection itself and // |async_requests_| is the only reference to the shared pointer that // maintains the lifetime of |connection| and possibly even this Transport // object instance. As a result, if we call CleanAsyncConnection() directly, // there is a chance that this object might be deleted. // Instead, schedule an asynchronous task to clean up the connection. RunCallbackAsync(FROM_HERE, base::Bind(&Transport::CleanAsyncConnection, weak_ptr_factory_.GetWeakPtr(), connection)); } void Transport::CleanAsyncConnection(Connection* connection) { auto p = async_requests_.find(connection); CHECK(p != async_requests_.end()) << "Unknown connection"; // Remove the request data from the map first, since this might be the only // reference to the Connection class and even possibly to this Transport. auto request_data = std::move(p->second); // Remove associated request ID. request_id_map_.erase(request_data->request_id); // Remove the connection's CURL handle from multi-handle. curl_interface_->MultiRemoveHandle(curl_multi_handle_, connection->curl_handle_); // Remove all the socket data associated with this connection. auto iter = poll_data_map_.begin(); while (iter != poll_data_map_.end()) { if (iter->first.first == connection->curl_handle_) iter = poll_data_map_.erase(iter); else ++iter; } // Remove pending asynchronous request data. // This must be last since there is a chance of this object being // destroyed as the result. See the comment in Transport::OnTransferComplete. async_requests_.erase(p); } } // namespace curl } // namespace http } // namespace brillo