/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "src/ipc/client_impl.h"

#include <fcntl.h>
#include <inttypes.h>
#include <unistd.h>

#include <utility>

#include "perfetto/base/task_runner.h"
#include "perfetto/base/utils.h"
#include "perfetto/ipc/service_descriptor.h"
#include "perfetto/ipc/service_proxy.h"

// TODO(primiano): Add ThreadChecker everywhere.

// TODO(primiano): Add timeouts.

namespace perfetto {
namespace ipc {

// static
std::unique_ptr<Client> Client::CreateInstance(const char* socket_name,
                                               base::TaskRunner* task_runner) {
  std::unique_ptr<Client> client(new ClientImpl(socket_name, task_runner));
  return client;
}

ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
    : task_runner_(task_runner), weak_ptr_factory_(this) {
  GOOGLE_PROTOBUF_VERIFY_VERSION;
  sock_ = base::UnixSocket::Connect(socket_name, this, task_runner);
}

ClientImpl::~ClientImpl() {
  // Ensure we are not destroyed in the middle of invoking a reply.
  PERFETTO_DCHECK(!invoking_method_reply_);
  OnDisconnect(
      nullptr);  // The base::UnixSocket* ptr is not used in OnDisconnect().
}

void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
  if (!service_proxy)
    return;
  if (!sock_->is_connected()) {
    queued_bindings_.emplace_back(service_proxy);
    return;
  }
  RequestID request_id = ++last_request_id_;
  Frame frame;
  frame.set_request_id(request_id);
  Frame::BindService* req = frame.mutable_msg_bind_service();
  const char* const service_name = service_proxy->GetDescriptor().service_name;
  req->set_service_name(service_name);
  if (!SendFrame(frame)) {
    PERFETTO_DLOG("BindService(%s) failed", service_name);
    return service_proxy->OnConnect(false /* success */);
  }
  QueuedRequest qr;
  qr.type = Frame::kMsgBindService;
  qr.request_id = request_id;
  qr.service_proxy = service_proxy;
  queued_requests_.emplace(request_id, std::move(qr));
}

void ClientImpl::UnbindService(ServiceID service_id) {
  service_bindings_.erase(service_id);
}

RequestID ClientImpl::BeginInvoke(ServiceID service_id,
                                  const std::string& method_name,
                                  MethodID remote_method_id,
                                  const ProtoMessage& method_args,
                                  bool drop_reply,
                                  base::WeakPtr<ServiceProxy> service_proxy,
                                  int fd) {
  std::string args_proto;
  RequestID request_id = ++last_request_id_;
  Frame frame;
  frame.set_request_id(request_id);
  Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
  req->set_service_id(service_id);
  req->set_method_id(remote_method_id);
  req->set_drop_reply(drop_reply);
  bool did_serialize = method_args.SerializeToString(&args_proto);
  req->set_args_proto(args_proto);
  if (!did_serialize || !SendFrame(frame, fd)) {
    PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
    return 0;
  }
  if (drop_reply)
    return 0;
  QueuedRequest qr;
  qr.type = Frame::kMsgInvokeMethod;
  qr.request_id = request_id;
  qr.method_name = method_name;
  qr.service_proxy = std::move(service_proxy);
  queued_requests_.emplace(request_id, std::move(qr));
  return request_id;
}

bool ClientImpl::SendFrame(const Frame& frame, int fd) {
  // Serialize the frame into protobuf, add the size header, and send it.
  std::string buf = BufferedFrameDeserializer::Serialize(frame);

  // TODO(primiano): this should do non-blocking I/O. But then what if the
  // socket buffer is full? We might want to either drop the request or throttle
  // the send and PostTask the reply later? Right now we are making Send()
  // blocking as a workaround. Propagate bakpressure to the caller instead.
  bool res = sock_->Send(buf.data(), buf.size(), fd,
                         base::UnixSocket::BlockingMode::kBlocking);
  PERFETTO_CHECK(res || !sock_->is_connected());
  return res;
}

void ClientImpl::OnConnect(base::UnixSocket*, bool connected) {
  // Drain the BindService() calls that were queued before establishig the
  // connection with the host.
  for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings_) {
    if (connected) {
      BindService(service_proxy);
    } else if (service_proxy) {
      service_proxy->OnConnect(false /* success */);
    }
  }
  queued_bindings_.clear();
}

void ClientImpl::OnDisconnect(base::UnixSocket*) {
  for (auto it : service_bindings_) {
    base::WeakPtr<ServiceProxy>& service_proxy = it.second;
    task_runner_->PostTask([service_proxy] {
      if (service_proxy)
        service_proxy->OnDisconnect();
    });
  }
  service_bindings_.clear();
  queued_bindings_.clear();
}

void ClientImpl::OnDataAvailable(base::UnixSocket*) {
  size_t rsize;
  do {
    auto buf = frame_deserializer_.BeginReceive();
    base::ScopedFile fd;
    rsize = sock_->Receive(buf.data, buf.size, &fd);
    if (fd) {
      PERFETTO_DCHECK(!received_fd_);
      int res = fcntl(*fd, F_SETFD, FD_CLOEXEC);
      PERFETTO_DCHECK(res == 0);
      received_fd_ = std::move(fd);
    }
    if (!frame_deserializer_.EndReceive(rsize)) {
      // The endpoint tried to send a frame that is way too large.
      return sock_->Shutdown(true);  // In turn will trigger an OnDisconnect().
      // TODO(fmayer): check this.
    }
  } while (rsize > 0);

  while (std::unique_ptr<Frame> frame = frame_deserializer_.PopNextFrame())
    OnFrameReceived(*frame);
}

void ClientImpl::OnFrameReceived(const Frame& frame) {
  auto queued_requests_it = queued_requests_.find(frame.request_id());
  if (queued_requests_it == queued_requests_.end()) {
    PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
                  static_cast<uint64_t>(frame.request_id()));
    return;
  }
  QueuedRequest req = std::move(queued_requests_it->second);
  queued_requests_.erase(queued_requests_it);

  if (req.type == Frame::kMsgBindService &&
      frame.msg_case() == Frame::kMsgBindServiceReply) {
    return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
  }
  if (req.type == Frame::kMsgInvokeMethod &&
      frame.msg_case() == Frame::kMsgInvokeMethodReply) {
    return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
  }
  if (frame.msg_case() == Frame::kMsgRequestError) {
    PERFETTO_DLOG("Host error: %s", frame.msg_request_error().error().c_str());
    return;
  }

  PERFETTO_DLOG(
      "OnFrameReceived() request msg_type=%d, received msg_type=%d in reply to "
      "request_id=%" PRIu64,
      req.type, frame.msg_case(), static_cast<uint64_t>(frame.request_id()));
}

void ClientImpl::OnBindServiceReply(QueuedRequest req,
                                    const Frame::BindServiceReply& reply) {
  base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
  if (!service_proxy)
    return;
  const char* svc_name = service_proxy->GetDescriptor().service_name;
  if (!reply.success()) {
    PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"", svc_name);
    return service_proxy->OnConnect(false /* success */);
  }

  auto prev_service = service_bindings_.find(reply.service_id());
  if (prev_service != service_bindings_.end() && prev_service->second.get()) {
    PERFETTO_DLOG(
        "BindService(): Trying to bind service \"%s\" but another service "
        "named \"%s\" is already bound with the same ID.",
        svc_name, prev_service->second->GetDescriptor().service_name);
    return service_proxy->OnConnect(false /* success */);
  }

  // Build the method [name] -> [remote_id] map.
  std::map<std::string, MethodID> methods;
  for (const auto& method : reply.methods()) {
    if (method.name().empty() || method.id() <= 0) {
      PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu64,
                    method.name().c_str(), static_cast<uint64_t>(method.id()));
      continue;
    }
    methods[method.name()] = method.id();
  }
  service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
                                   reply.service_id(), std::move(methods));
  service_bindings_[reply.service_id()] = service_proxy;
  service_proxy->OnConnect(true /* success */);
}

void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
                                     const Frame::InvokeMethodReply& reply) {
  base::WeakPtr<ServiceProxy> service_proxy = req.service_proxy;
  if (!service_proxy)
    return;
  std::unique_ptr<ProtoMessage> decoded_reply;
  if (reply.success()) {
    // If this becomes a hotspot, optimize by maintaining a dedicated hashtable.
    for (const auto& method : service_proxy->GetDescriptor().methods) {
      if (req.method_name == method.name) {
        decoded_reply = method.reply_proto_decoder(reply.reply_proto());
        break;
      }
    }
  }
  const RequestID request_id = req.request_id;
  invoking_method_reply_ = true;
  service_proxy->EndInvoke(request_id, std::move(decoded_reply),
                           reply.has_more());
  invoking_method_reply_ = false;

  // If this is a streaming method and future replies will be resolved, put back
  // the |req| with the callback into the set of active requests.
  if (reply.has_more())
    queued_requests_.emplace(request_id, std::move(req));
}

ClientImpl::QueuedRequest::QueuedRequest() = default;

base::ScopedFile ClientImpl::TakeReceivedFD() {
  return std::move(received_fd_);
}

}  // namespace ipc
}  // namespace perfetto