普通文本  |  307行  |  9.19 KB

// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "device/serial/data_sink_receiver.h"

#include <limits>

#include "base/bind.h"
#include "device/serial/async_waiter.h"

namespace device {

// Represents a flush of data that has not been completed.
class DataSinkReceiver::PendingFlush {
 public:
  PendingFlush();

  // Initializes this PendingFlush with |num_bytes|, the number of bytes to
  // flush.
  void SetNumBytesToFlush(uint32_t num_bytes);

  // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
  // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
  // |bytes_to_flush_| bytes were flushed or the error if one is encountered
  // discarding data from |handle|.
  MojoResult Flush(mojo::DataPipeConsumerHandle handle);

  // Whether this PendingFlush has received the number of bytes to flush.
  bool received_flush() { return received_flush_; }

 private:
  // Whether this PendingFlush has received the number of bytes to flush.
  bool received_flush_;

  // The remaining number of bytes to flush.
  uint32_t bytes_to_flush_;
};

// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
// a DataSinkReceiver.
class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
 public:
  Buffer(scoped_refptr<DataSinkReceiver> receiver,
         const char* buffer,
         uint32_t buffer_size);
  virtual ~Buffer();

  void Cancel(int32_t error);

  // ReadOnlyBuffer overrides.
  virtual const char* GetData() OVERRIDE;
  virtual uint32_t GetSize() OVERRIDE;
  virtual void Done(uint32_t bytes_read) OVERRIDE;
  virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE;

 private:
  // The DataSinkReceiver whose data pipe we are providing a view.
  scoped_refptr<DataSinkReceiver> receiver_;

  const char* buffer_;
  uint32_t buffer_size_;

  // Whether this receive has been cancelled.
  bool cancelled_;

  // If |cancelled_|, contains the cancellation error to report.
  int32_t cancellation_error_;
};

DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
                                   const CancelCallback& cancel_callback,
                                   const ErrorCallback& error_callback)
    : ready_callback_(ready_callback),
      cancel_callback_(cancel_callback),
      error_callback_(error_callback),
      buffer_in_use_(NULL),
      shut_down_(false),
      weak_factory_(this) {
}

void DataSinkReceiver::ShutDown() {
  shut_down_ = true;
  if (waiter_)
    waiter_.reset();
}

DataSinkReceiver::~DataSinkReceiver() {
}

void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
  if (handle_.is_valid()) {
    DispatchFatalError();
    return;
  }

  handle_ = handle.Pass();
  StartWaiting();
}

void DataSinkReceiver::Cancel(int32_t error) {
  // If we have sent a ReportBytesSentAndError but have not received the
  // response, that ReportBytesSentAndError message will appear to the
  // DataSinkClient to be caused by this Cancel message. In that case, we ignore
  // the cancel.
  if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
    return;

  // If there is a buffer is in use, mark the buffer as cancelled and notify the
  // client by calling |cancel_callback_|. The sink implementation may or may
  // not take the cancellation into account when deciding what error (if any) to
  // return. If the sink returns an error, we ignore the cancellation error.
  // Otherwise, if the sink does not report an error, we override that with the
  // cancellation error. Once a cancellation has been received, the next report
  // sent to the client will always contain an error; the error returned by the
  // sink or the cancellation error if the sink does not return an error.
  if (buffer_in_use_) {
    buffer_in_use_->Cancel(error);
    if (!cancel_callback_.is_null())
      cancel_callback_.Run(error);
    return;
  }
  // If there is no buffer in use, immediately report the error and cancel the
  // waiting for the data pipe if one exists. This transitions straight into the
  // state after the sink has returned an error.
  waiter_.reset();
  ReportBytesSentAndError(0, error);
}

void DataSinkReceiver::OnConnectionError() {
  DispatchFatalError();
}

void DataSinkReceiver::StartWaiting() {
  DCHECK(!waiter_ && !shut_down_);
  waiter_.reset(
      new AsyncWaiter(handle_.get(),
                      MOJO_HANDLE_SIGNAL_READABLE,
                      base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
}

void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
  DCHECK(waiter_ && !shut_down_);
  waiter_.reset();
  if (result != MOJO_RESULT_OK) {
    DispatchFatalError();
    return;
  }
  // If there are any queued flushes (from ReportBytesSentAndError()), let them
  // flush data from the data pipe.
  if (!pending_flushes_.empty()) {
    MojoResult result = pending_flushes_.front()->Flush(handle_.get());
    if (result == MOJO_RESULT_OK) {
      pending_flushes_.pop();
    } else if (result != MOJO_RESULT_SHOULD_WAIT) {
      DispatchFatalError();
      return;
    }
    StartWaiting();
    return;
  }
  const void* data = NULL;
  uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
  result = mojo::BeginReadDataRaw(
      handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
  if (result != MOJO_RESULT_OK) {
    DispatchFatalError();
    return;
  }
  buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
  ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
}

void DataSinkReceiver::Done(uint32_t bytes_read) {
  if (!DoneInternal(bytes_read))
    return;
  client()->ReportBytesSent(bytes_read);
  StartWaiting();
}

void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
  if (!DoneInternal(bytes_read))
    return;
  ReportBytesSentAndError(bytes_read, error);
}

bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
  if (shut_down_)
    return false;

  DCHECK(buffer_in_use_);
  buffer_in_use_ = NULL;
  MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
  if (result != MOJO_RESULT_OK) {
    DispatchFatalError();
    return false;
  }
  return true;
}

void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
                                               int32_t error) {
  // When we encounter an error, we must discard the data from any sends already
  // in the data pipe before we can resume dispatching data to the sink. We add
  // a pending flush here. The response containing the number of bytes to flush
  // is handled in SetNumBytesToFlush(). The actual flush is handled in
  // OnDoneWaiting().
  pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
  client()->ReportBytesSentAndError(
      bytes_read,
      error,
      base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
                 weak_factory_.GetWeakPtr()));
}

void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
  DCHECK(!pending_flushes_.empty());
  DCHECK(!pending_flushes_.back()->received_flush());
  pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
  if (!waiter_)
    StartWaiting();
}

void DataSinkReceiver::DispatchFatalError() {
  if (shut_down_)
    return;

  ShutDown();
  if (!error_callback_.is_null())
    error_callback_.Run();
}

DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver,
                                 const char* buffer,
                                 uint32_t buffer_size)
    : receiver_(receiver),
      buffer_(buffer),
      buffer_size_(buffer_size),
      cancelled_(false),
      cancellation_error_(0) {
}

DataSinkReceiver::Buffer::~Buffer() {
  if (!receiver_.get())
    return;
  if (cancelled_)
    receiver_->DoneWithError(0, cancellation_error_);
  else
    receiver_->Done(0);
}

void DataSinkReceiver::Buffer::Cancel(int32_t error) {
  cancelled_ = true;
  cancellation_error_ = error;
}

const char* DataSinkReceiver::Buffer::GetData() {
  return buffer_;
}

uint32_t DataSinkReceiver::Buffer::GetSize() {
  return buffer_size_;
}

void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
  if (cancelled_)
    receiver_->DoneWithError(bytes_read, cancellation_error_);
  else
    receiver_->Done(bytes_read);
  receiver_ = NULL;
  buffer_ = NULL;
  buffer_size_ = 0;
}

void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
                                             int32_t error) {
  receiver_->DoneWithError(bytes_read, error);
  receiver_ = NULL;
  buffer_ = NULL;
  buffer_size_ = 0;
}

DataSinkReceiver::PendingFlush::PendingFlush()
    : received_flush_(false), bytes_to_flush_(0) {
}

void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
  DCHECK(!received_flush_);
  received_flush_ = true;
  bytes_to_flush_ = num_bytes;
}

MojoResult DataSinkReceiver::PendingFlush::Flush(
    mojo::DataPipeConsumerHandle handle) {
  DCHECK(received_flush_);
  uint32_t num_bytes = bytes_to_flush_;
  MojoResult result =
      mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
  if (result != MOJO_RESULT_OK)
    return result;
  DCHECK(num_bytes <= bytes_to_flush_);
  bytes_to_flush_ -= num_bytes;
  return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
}

}  // namespace device