普通文本  |  275行  |  8.85 KB

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/system/data_pipe.h"

#include <string.h>

#include <algorithm>
#include <limits>

#include "base/logging.h"
#include "mojo/system/constants.h"
#include "mojo/system/memory.h"
#include "mojo/system/waiter_list.h"

namespace mojo {
namespace system {

void DataPipe::ProducerCancelAllWaiters() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_->CancelAllWaiters();
}

void DataPipe::ProducerClose() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_.reset();
  ProducerCloseImplNoLock();
}

MojoResult DataPipe::ProducerWriteData(const void* elements,
                                       uint32_t* num_elements,
                                       MojoWriteDataFlags flags) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (producer_in_two_phase_write_)
    return MOJO_RESULT_BUSY;

  // TODO(vtl): This implementation may write less than requested, even if room
  // is available. Fix this. (Probably make a subclass-specific impl.)
  void* buffer = NULL;
  uint32_t buffer_num_elements = *num_elements;
  MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer,
                                                   &buffer_num_elements,
                                                   flags);
  if (rv != MOJO_RESULT_OK)
    return rv;

  uint32_t num_elements_to_write = std::min(*num_elements, buffer_num_elements);
  memcpy(buffer, elements, num_elements_to_write * element_size_);

  rv = ProducerEndWriteDataImplNoLock(num_elements_to_write);
  if (rv != MOJO_RESULT_OK)
    return rv;

  *num_elements = num_elements_to_write;
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
                                            uint32_t* buffer_num_elements,
                                            MojoWriteDataFlags flags) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (producer_in_two_phase_write_)
    return MOJO_RESULT_BUSY;

  MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer,
                                                   buffer_num_elements,
                                                   flags);
  if (rv != MOJO_RESULT_OK)
    return rv;

  producer_in_two_phase_write_ = true;
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if (!producer_in_two_phase_write_)
    return MOJO_RESULT_FAILED_PRECONDITION;

  MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written);
  producer_in_two_phase_write_ = false;  // End two-phase write even on failure.
  return rv;
}

MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
                                       MojoWaitFlags flags,
                                       MojoResult wake_result) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());

  if ((flags & ProducerSatisfiedFlagsNoLock()))
    return MOJO_RESULT_ALREADY_EXISTS;
  if (!(flags & ProducerSatisfiableFlagsNoLock()))
    return MOJO_RESULT_FAILED_PRECONDITION;

  producer_waiter_list_->AddWaiter(waiter, flags, wake_result);
  return MOJO_RESULT_OK;
}

void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_producer_no_lock());
  producer_waiter_list_->RemoveWaiter(waiter);
}

void DataPipe::ConsumerCancelAllWaiters() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_->CancelAllWaiters();
}

void DataPipe::ConsumerClose() {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_.reset();
  ConsumerCloseImplNoLock();
}

MojoResult DataPipe::ConsumerReadData(void* elements,
                                      uint32_t* num_elements,
                                      MojoReadDataFlags flags) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_)
    return MOJO_RESULT_BUSY;

  if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
    return ConsumerDiscardDataNoLock(num_elements,
                                     (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
  }
  if ((flags & MOJO_READ_DATA_FLAG_QUERY))
    return ConsumerQueryDataNoLock(num_elements);

  // TODO(vtl): This implementation may write less than requested, even if room
  // is available. Fix this. (Probably make a subclass-specific impl.)
  const void* buffer = NULL;
  uint32_t buffer_num_elements = 0;
  MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer,
                                                  &buffer_num_elements,
                                                  flags);
  if (rv != MOJO_RESULT_OK)
    return rv;

  uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements);
  memcpy(elements, buffer, num_elements_to_read * element_size_);

  rv = ConsumerEndReadDataImplNoLock(num_elements_to_read);
  if (rv != MOJO_RESULT_OK)
    return rv;

  *num_elements = num_elements_to_read;
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
                                           uint32_t* buffer_num_elements,
                                           MojoReadDataFlags flags) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (consumer_in_two_phase_read_)
    return MOJO_RESULT_BUSY;

  MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer,
                                                  buffer_num_elements,
                                                  flags);
  if (rv != MOJO_RESULT_OK)
    return rv;

  consumer_in_two_phase_read_ = true;
  return MOJO_RESULT_OK;
}

MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if (!consumer_in_two_phase_read_)
    return MOJO_RESULT_FAILED_PRECONDITION;

  MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read);
  consumer_in_two_phase_read_ = false;  // End two-phase read even on failure.
  return rv;
}

MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
                                       MojoWaitFlags flags,
                                       MojoResult wake_result) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());

  if ((flags & ConsumerSatisfiedFlagsNoLock()))
    return MOJO_RESULT_ALREADY_EXISTS;
  if (!(flags & ConsumerSatisfiableFlagsNoLock()))
    return MOJO_RESULT_FAILED_PRECONDITION;

  consumer_waiter_list_->AddWaiter(waiter, flags, wake_result);
  return MOJO_RESULT_OK;
}

void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
  base::AutoLock locker(lock_);
  DCHECK(has_local_consumer_no_lock());
  consumer_waiter_list_->RemoveWaiter(waiter);
}

DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer)
    : element_size_(0),
      producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
      consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
      producer_in_two_phase_write_(false),
      consumer_in_two_phase_read_(false) {
  DCHECK(has_local_producer || has_local_consumer);
}

DataPipe::~DataPipe() {
  DCHECK(!has_local_producer_no_lock());
  DCHECK(!has_local_consumer_no_lock());
}

MojoResult DataPipe::Init(bool may_discard,
                          size_t element_size,
                          size_t capacity_num_elements) {
  // No need to lock: This method is not thread-safe.

  if (element_size == 0)
    return MOJO_RESULT_INVALID_ARGUMENT;
  if (!capacity_num_elements) {
    // Set the capacity to the default (rounded down by element size, but always
    // at least one element).
    capacity_num_elements =
        std::max(static_cast<size_t>(1),
                 kDefaultDataPipeCapacityBytes / element_size);
  }
  if (capacity_num_elements >
          std::numeric_limits<uint32_t>::max() / element_size)
    return MOJO_RESULT_INVALID_ARGUMENT;
  if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes)
    return MOJO_RESULT_RESOURCE_EXHAUSTED;

  may_discard_ = may_discard;
  element_size_ = element_size;
  capacity_num_elements_ = capacity_num_elements;
  return MOJO_RESULT_OK;
}

void DataPipe::AwakeProducerWaitersForStateChangeNoLock() {
  lock_.AssertAcquired();
  if (!has_local_producer_no_lock())
    return;
  producer_waiter_list_->AwakeWaitersForStateChange(
      ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock());
}

void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
  lock_.AssertAcquired();
  if (!has_local_consumer_no_lock())
    return;
  consumer_waiter_list_->AwakeWaitersForStateChange(
      ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
}

}  // namespace system
}  // namespace mojo