普通文本  |  474行  |  16.55 KB

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/system/data_pipe.h"

#include <string.h>

#include <algorithm>
#include <limits>

#include "base/compiler_specific.h"
#include "base/logging.h"
#include "mojo/system/constants.h"
#include "mojo/system/memory.h"
#include "mojo/system/options_validation.h"
#include "mojo/system/waiter_list.h"

namespace mojo {
namespace system {

// static
const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
    static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
    MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
    static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};

// static
MojoResult DataPipe::ValidateCreateOptions(
    UserPointer<const MojoCreateDataPipeOptions> in_options,
    MojoCreateDataPipeOptions* out_options) {
  const MojoCreateDataPipeOptionsFlags kKnownFlags =
      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;

  *out_options = kDefaultCreateOptions;
  if (in_options.IsNull())
    return MOJO_RESULT_OK;

  UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
  if (!reader.is_valid())
    return MOJO_RESULT_INVALID_ARGUMENT;

  if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
    return MOJO_RESULT_OK;
  if ((reader.options().flags & ~kKnownFlags))
    return MOJO_RESULT_UNIMPLEMENTED;
  out_options->flags = reader.options().flags;

  // Checks for fields beyond |flags|:

  if (!OPTIONS_STRUCT_HAS_MEMBER(
          MojoCreateDataPipeOptions, element_num_bytes, reader))
    return MOJO_RESULT_OK;
  if (reader.options().element_num_bytes == 0)
    return MOJO_RESULT_INVALID_ARGUMENT;
  out_options->element_num_bytes = reader.options().element_num_bytes;

  if (!OPTIONS_STRUCT_HAS_MEMBER(
          MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
      reader.options().capacity_num_bytes == 0) {
    // Round the default capacity down to a multiple of the element size (but at
    // least one element).
    out_options->capacity_num_bytes =
        std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
                                       (kDefaultDataPipeCapacityBytes %
                                        out_options->element_num_bytes)),
                 out_options->element_num_bytes);
    return MOJO_RESULT_OK;
  }
  if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
    return MOJO_RESULT_INVALID_ARGUMENT;
  if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
    return MOJO_RESULT_RESOURCE_EXHAUSTED;
  out_options->capacity_num_bytes = reader.options().capacity_num_bytes;

  return MOJO_RESULT_OK;
}

void DataPipe::ProducerCancelAllWaiters() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_->CancelAllWaiters();
}

void DataPipe::ProducerClose() {
  base::AutoLock locker(lock_);
  DCHECK(producer_open_);
  producer_open_ = false;
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_.reset();
  // Not a bug, except possibly in "user" code.
  DVLOG_IF(2, producer_in_two_phase_write_no_lock())
      << "Producer closed with active two-phase write";
  producer_two_phase_max_num_bytes_written_ = 0;
  ProducerCloseImplNoLock();
  AwakeConsumerWaitersForStateChangeNoLock(
      ConsumerGetHandleSignalsStateImplNoLock());
}

MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
                                       UserPointer<uint32_t> num_bytes,
                                       bool all_or_none) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (producer_in_two_phase_write_no_lock())
    return MOJO_RESULT_BUSY;
  if (!consumer_open_no_lock())
    return MOJO_RESULT_FAILED_PRECONDITION;

  // Returning "busy" takes priority over "invalid argument".
  uint32_t max_num_bytes_to_write = num_bytes.Get();
  if (max_num_bytes_to_write % element_num_bytes_ != 0)
    return MOJO_RESULT_INVALID_ARGUMENT;

  if (max_num_bytes_to_write == 0)
    return MOJO_RESULT_OK;  // Nothing to do.

  uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;

  HandleSignalsState old_consumer_state =
      ConsumerGetHandleSignalsStateImplNoLock();
  MojoResult rv = ProducerWriteDataImplNoLock(
      elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
  HandleSignalsState new_consumer_state =
      ConsumerGetHandleSignalsStateImplNoLock();
  if (!new_consumer_state.equals(old_consumer_state))
    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
  return rv;
}

MojoResult DataPipe::ProducerBeginWriteData(
    UserPointer<void*> buffer,
    UserPointer<uint32_t> buffer_num_bytes,
    bool all_or_none) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (producer_in_two_phase_write_no_lock())
    return MOJO_RESULT_BUSY;
  if (!consumer_open_no_lock())
    return MOJO_RESULT_FAILED_PRECONDITION;

  uint32_t min_num_bytes_to_write = 0;
  if (all_or_none) {
    min_num_bytes_to_write = buffer_num_bytes.Get();
    if (min_num_bytes_to_write % element_num_bytes_ != 0)
      return MOJO_RESULT_INVALID_ARGUMENT;
  }

  MojoResult rv = ProducerBeginWriteDataImplNoLock(
      buffer, buffer_num_bytes, min_num_bytes_to_write);
  if (rv != MOJO_RESULT_OK)
    return rv;
  // Note: No need to awake producer waiters, even though we're going from
  // writable to non-writable (since you can't wait on non-writability).
  // Similarly, though this may have discarded data (in "may discard" mode),
  // making it non-readable, there's still no need to awake consumer waiters.
  DCHECK(producer_in_two_phase_write_no_lock());
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (!producer_in_two_phase_write_no_lock())
    return MOJO_RESULT_FAILED_PRECONDITION;
  // Note: Allow successful completion of the two-phase write even if the
  // consumer has been closed.

  HandleSignalsState old_consumer_state =
      ConsumerGetHandleSignalsStateImplNoLock();
  MojoResult rv;
  if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
      num_bytes_written % element_num_bytes_ != 0) {
    rv = MOJO_RESULT_INVALID_ARGUMENT;
    producer_two_phase_max_num_bytes_written_ = 0;
  } else {
    rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
  }
  // Two-phase write ended even on failure.
  DCHECK(!producer_in_two_phase_write_no_lock());
  // If we're now writable, we *became* writable (since we weren't writable
  // during the two-phase write), so awake producer waiters.
  HandleSignalsState new_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
  HandleSignalsState new_consumer_state =
      ConsumerGetHandleSignalsStateImplNoLock();
  if (!new_consumer_state.equals(old_consumer_state))
    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
  return rv;
}

HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  return ProducerGetHandleSignalsStateImplNoLock();
}

MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
                                       MojoHandleSignals signals,
                                       uint32_t context,
                                       HandleSignalsState* signals_state) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
  if (producer_state.satisfies(signals)) {
    if (signals_state)
      *signals_state = producer_state;
    return MOJO_RESULT_ALREADY_EXISTS;
  }
  if (!producer_state.can_satisfy(signals)) {
    if (signals_state)
      *signals_state = producer_state;
    return MOJO_RESULT_FAILED_PRECONDITION;
  }

  producer_waiter_list_->AddWaiter(waiter, signals, context);
  return MOJO_RESULT_OK;
}

void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
                                    HandleSignalsState* signals_state) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_->RemoveWaiter(waiter);
  if (signals_state)
    *signals_state = ProducerGetHandleSignalsStateImplNoLock();
}

bool DataPipe::ProducerIsBusy() const {
  base::AutoLock locker(lock_);
  return producer_in_two_phase_write_no_lock();
}

void DataPipe::ConsumerCancelAllWaiters() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_->CancelAllWaiters();
}

void DataPipe::ConsumerClose() {
  base::AutoLock locker(lock_);
  DCHECK(consumer_open_);
  consumer_open_ = false;
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_.reset();
  // Not a bug, except possibly in "user" code.
  DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
      << "Consumer closed with active two-phase read";
  consumer_two_phase_max_num_bytes_read_ = 0;
  ConsumerCloseImplNoLock();
  AwakeProducerWaitersForStateChangeNoLock(
      ProducerGetHandleSignalsStateImplNoLock());
}

MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
                                      UserPointer<uint32_t> num_bytes,
                                      bool all_or_none) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_no_lock())
    return MOJO_RESULT_BUSY;

  uint32_t max_num_bytes_to_read = num_bytes.Get();
  if (max_num_bytes_to_read % element_num_bytes_ != 0)
    return MOJO_RESULT_INVALID_ARGUMENT;

  if (max_num_bytes_to_read == 0)
    return MOJO_RESULT_OK;  // Nothing to do.

  uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;

  HandleSignalsState old_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  MojoResult rv = ConsumerReadDataImplNoLock(
      elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
  HandleSignalsState new_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  if (!new_producer_state.equals(old_producer_state))
    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
  return rv;
}

MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
                                         bool all_or_none) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_no_lock())
    return MOJO_RESULT_BUSY;

  uint32_t max_num_bytes_to_discard = num_bytes.Get();
  if (max_num_bytes_to_discard % element_num_bytes_ != 0)
    return MOJO_RESULT_INVALID_ARGUMENT;

  if (max_num_bytes_to_discard == 0)
    return MOJO_RESULT_OK;  // Nothing to do.

  uint32_t min_num_bytes_to_discard =
      all_or_none ? max_num_bytes_to_discard : 0;

  HandleSignalsState old_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  MojoResult rv = ConsumerDiscardDataImplNoLock(
      num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
  HandleSignalsState new_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  if (!new_producer_state.equals(old_producer_state))
    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
  return rv;
}

MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_no_lock())
    return MOJO_RESULT_BUSY;

  // Note: Don't need to validate |*num_bytes| for query.
  return ConsumerQueryDataImplNoLock(num_bytes);
}

MojoResult DataPipe::ConsumerBeginReadData(
    UserPointer<const void*> buffer,
    UserPointer<uint32_t> buffer_num_bytes,
    bool all_or_none) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_no_lock())
    return MOJO_RESULT_BUSY;

  uint32_t min_num_bytes_to_read = 0;
  if (all_or_none) {
    min_num_bytes_to_read = buffer_num_bytes.Get();
    if (min_num_bytes_to_read % element_num_bytes_ != 0)
      return MOJO_RESULT_INVALID_ARGUMENT;
  }

  MojoResult rv = ConsumerBeginReadDataImplNoLock(
      buffer, buffer_num_bytes, min_num_bytes_to_read);
  if (rv != MOJO_RESULT_OK)
    return rv;
  DCHECK(consumer_in_two_phase_read_no_lock());
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (!consumer_in_two_phase_read_no_lock())
    return MOJO_RESULT_FAILED_PRECONDITION;

  HandleSignalsState old_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  MojoResult rv;
  if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
      num_bytes_read % element_num_bytes_ != 0) {
    rv = MOJO_RESULT_INVALID_ARGUMENT;
    consumer_two_phase_max_num_bytes_read_ = 0;
  } else {
    rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
  }
  // Two-phase read ended even on failure.
  DCHECK(!consumer_in_two_phase_read_no_lock());
  // If we're now readable, we *became* readable (since we weren't readable
  // during the two-phase read), so awake consumer waiters.
  HandleSignalsState new_consumer_state =
      ConsumerGetHandleSignalsStateImplNoLock();
  if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
  HandleSignalsState new_producer_state =
      ProducerGetHandleSignalsStateImplNoLock();
  if (!new_producer_state.equals(old_producer_state))
    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
  return rv;
}

HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  return ConsumerGetHandleSignalsStateImplNoLock();
}

MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
                                       MojoHandleSignals signals,
                                       uint32_t context,
                                       HandleSignalsState* signals_state) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
  if (consumer_state.satisfies(signals)) {
    if (signals_state)
      *signals_state = consumer_state;
    return MOJO_RESULT_ALREADY_EXISTS;
  }
  if (!consumer_state.can_satisfy(signals)) {
    if (signals_state)
      *signals_state = consumer_state;
    return MOJO_RESULT_FAILED_PRECONDITION;
  }

  consumer_waiter_list_->AddWaiter(waiter, signals, context);
  return MOJO_RESULT_OK;
}

void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
                                    HandleSignalsState* signals_state) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_->RemoveWaiter(waiter);
  if (signals_state)
    *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
}

bool DataPipe::ConsumerIsBusy() const {
  base::AutoLock locker(lock_);
  return consumer_in_two_phase_read_no_lock();
}

DataPipe::DataPipe(bool has_local_producer,
                   bool has_local_consumer,
                   const MojoCreateDataPipeOptions& validated_options)
    : may_discard_((validated_options.flags &
                    MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
      element_num_bytes_(validated_options.element_num_bytes),
      capacity_num_bytes_(validated_options.capacity_num_bytes),
      producer_open_(true),
      consumer_open_(true),
      producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr),
      consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr),
      producer_two_phase_max_num_bytes_written_(0),
      consumer_two_phase_max_num_bytes_read_(0) {
  // Check that the passed in options actually are validated.
  MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
  DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
            MOJO_RESULT_OK);
}

DataPipe::~DataPipe() {
  DCHECK(!producer_open_);
  DCHECK(!consumer_open_);
  DCHECK(!producer_waiter_list_);
  DCHECK(!consumer_waiter_list_);
}

void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
    const HandleSignalsState& new_producer_state) {
  lock_.AssertAcquired();
  if (!has_local_producer_no_lock())
    return;
  producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
}

void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
    const HandleSignalsState& new_consumer_state) {
  lock_.AssertAcquired();
  if (!has_local_consumer_no_lock())
    return;
  consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
}

}  // namespace system
}  // namespace mojo