普通文本  |  281行  |  8.66 KB

// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "device/serial/data_sender.h"

#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"

namespace device {

// Represents a send that is not yet fulfilled.
class DataSender::PendingSend {
 public:
  PendingSend(const base::StringPiece& data,
              const DataSentCallback& callback,
              const SendErrorCallback& error_callback,
              int32_t fatal_error_value);

  // Invoked to report that |num_bytes| of data have been sent. Subtracts the
  // number of bytes that were part of this send from |num_bytes|. Returns
  // whether this send has been completed. If this send has been completed, this
  // calls |callback_|.
  bool ReportBytesSent(uint32_t* num_bytes);

  // Invoked to report that |num_bytes| of data have been sent and then an
  // error, |error| was encountered. Subtracts the number of bytes that were
  // part of this send from |num_bytes|. If this send was not completed before
  // the error, this calls |error_callback_| to report the error. Otherwise,
  // this calls |callback_|. Returns the number of bytes sent but not acked.
  uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);

  // Reports |fatal_error_value_| to |receive_error_callback_|.
  void DispatchFatalError();

  // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
  // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
  // or the error if one is encountered writing to |handle|.
  MojoResult SendData(mojo::DataPipeProducerHandle handle);

 private:
  // Invoked to update |bytes_acked_| and |num_bytes|.
  void ReportBytesSentInternal(uint32_t* num_bytes);

  // The data to send.
  const base::StringPiece data_;

  // The callback to report success.
  const DataSentCallback callback_;

  // The callback to report errors.
  const SendErrorCallback error_callback_;

  // The error value to report when DispatchFatalError() is called.
  const int32_t fatal_error_value_;

  // The number of bytes sent to the data pipe.
  uint32_t bytes_sent_;

  // The number of bytes acked.
  uint32_t bytes_acked_;
};

DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
                       uint32_t buffer_size,
                       int32_t fatal_error_value)
    : sink_(sink.Pass()),
      fatal_error_value_(fatal_error_value),
      shut_down_(false) {
  sink_.set_error_handler(this);
  MojoCreateDataPipeOptions options = {
      sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
  };
  options.struct_size = sizeof(options);
  mojo::ScopedDataPipeConsumerHandle remote_handle;
  MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
  DCHECK_EQ(MOJO_RESULT_OK, result);
  sink_->Init(remote_handle.Pass());
  sink_.set_client(this);
}

DataSender::~DataSender() {
  ShutDown();
}

bool DataSender::Send(const base::StringPiece& data,
                      const DataSentCallback& callback,
                      const SendErrorCallback& error_callback) {
  DCHECK(!callback.is_null() && !error_callback.is_null());
  if (!pending_cancel_.is_null() || shut_down_)
    return false;

  pending_sends_.push(linked_ptr<PendingSend>(
      new PendingSend(data, callback, error_callback, fatal_error_value_)));
  SendInternal();
  return true;
}

bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
  DCHECK(!callback.is_null());
  if (!pending_cancel_.is_null() || shut_down_)
    return false;
  if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
    base::MessageLoop::current()->PostTask(FROM_HERE, callback);
    return true;
  }

  pending_cancel_ = callback;
  sink_->Cancel(error);
  return true;
}

void DataSender::ReportBytesSent(uint32_t bytes_sent) {
  if (shut_down_)
    return;

  while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
         sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
    sends_awaiting_ack_.pop();
  }
  if (bytes_sent > 0 && !pending_sends_.empty()) {
    bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
    DCHECK(!finished);
    if (finished) {
      ShutDown();
      return;
    }
  }
  if (bytes_sent != 0) {
    ShutDown();
    return;
  }
  if (pending_sends_.empty() && sends_awaiting_ack_.empty())
    RunCancelCallback();
}

void DataSender::ReportBytesSentAndError(
    uint32_t bytes_sent,
    int32_t error,
    const mojo::Callback<void(uint32_t)>& callback) {
  if (shut_down_)
    return;

  uint32_t bytes_to_flush = 0;
  while (!sends_awaiting_ack_.empty()) {
    bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
        &bytes_sent, error);
    sends_awaiting_ack_.pop();
  }
  while (!pending_sends_.empty()) {
    bytes_to_flush +=
        pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
    pending_sends_.pop();
  }
  callback.Run(bytes_to_flush);
  RunCancelCallback();
}

void DataSender::OnConnectionError() {
  ShutDown();
}

void DataSender::SendInternal() {
  while (!pending_sends_.empty()) {
    MojoResult result = pending_sends_.front()->SendData(handle_.get());
    if (result == MOJO_RESULT_OK) {
      sends_awaiting_ack_.push(pending_sends_.front());
      pending_sends_.pop();
    } else if (result == MOJO_RESULT_SHOULD_WAIT) {
      waiter_.reset(new AsyncWaiter(
          handle_.get(),
          MOJO_HANDLE_SIGNAL_WRITABLE,
          base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
      return;
    } else {
      ShutDown();
      return;
    }
  }
}

void DataSender::OnDoneWaiting(MojoResult result) {
  waiter_.reset();
  if (result != MOJO_RESULT_OK) {
    ShutDown();
    return;
  }
  SendInternal();
}

void DataSender::RunCancelCallback() {
  DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
  if (pending_cancel_.is_null())
    return;

  base::MessageLoop::current()->PostTask(FROM_HERE,
                                         base::Bind(pending_cancel_));
  pending_cancel_.Reset();
}

void DataSender::ShutDown() {
  waiter_.reset();
  shut_down_ = true;
  while (!pending_sends_.empty()) {
    pending_sends_.front()->DispatchFatalError();
    pending_sends_.pop();
  }
  while (!sends_awaiting_ack_.empty()) {
    sends_awaiting_ack_.front()->DispatchFatalError();
    sends_awaiting_ack_.pop();
  }
  RunCancelCallback();
}

DataSender::PendingSend::PendingSend(const base::StringPiece& data,
                                     const DataSentCallback& callback,
                                     const SendErrorCallback& error_callback,
                                     int32_t fatal_error_value)
    : data_(data),
      callback_(callback),
      error_callback_(error_callback),
      fatal_error_value_(fatal_error_value),
      bytes_sent_(0),
      bytes_acked_(0) {
}

bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
  ReportBytesSentInternal(num_bytes);
  if (bytes_acked_ < data_.size())
    return false;

  base::MessageLoop::current()->PostTask(FROM_HERE,
                                         base::Bind(callback_, bytes_acked_));
  return true;
}

uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
                                                          int32_t error) {
  ReportBytesSentInternal(num_bytes);
  if (*num_bytes > 0) {
    base::MessageLoop::current()->PostTask(FROM_HERE,
                                           base::Bind(callback_, bytes_acked_));
    return 0;
  }
  base::MessageLoop::current()->PostTask(
      FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
  return bytes_sent_ - bytes_acked_;
}

void DataSender::PendingSend::DispatchFatalError() {
  base::MessageLoop::current()->PostTask(
      FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
}

MojoResult DataSender::PendingSend::SendData(
    mojo::DataPipeProducerHandle handle) {
  uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
  MojoResult result = mojo::WriteDataRaw(handle,
                                         data_.data() + bytes_sent_,
                                         &bytes_to_send,
                                         MOJO_WRITE_DATA_FLAG_NONE);
  if (result != MOJO_RESULT_OK)
    return result;

  bytes_sent_ += bytes_to_send;
  return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
}

void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
  bytes_acked_ += *num_bytes;
  if (bytes_acked_ > bytes_sent_) {
    *num_bytes = bytes_acked_ - bytes_sent_;
    bytes_acked_ = bytes_sent_;
  } else {
    *num_bytes = 0;
  }
}

}  // namespace device