// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/system/data_pipe.h"
#include <string.h>
#include <algorithm>
#include <limits>
#include "base/logging.h"
#include "mojo/system/constants.h"
#include "mojo/system/memory.h"
#include "mojo/system/waiter_list.h"
namespace mojo {
namespace system {
void DataPipe::ProducerCancelAllWaiters() {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
producer_waiter_list_->CancelAllWaiters();
}
void DataPipe::ProducerClose() {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
producer_waiter_list_.reset();
ProducerCloseImplNoLock();
}
MojoResult DataPipe::ProducerWriteData(const void* elements,
uint32_t* num_elements,
MojoWriteDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
if (producer_in_two_phase_write_)
return MOJO_RESULT_BUSY;
// TODO(vtl): This implementation may write less than requested, even if room
// is available. Fix this. (Probably make a subclass-specific impl.)
void* buffer = NULL;
uint32_t buffer_num_elements = *num_elements;
MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer,
&buffer_num_elements,
flags);
if (rv != MOJO_RESULT_OK)
return rv;
uint32_t num_elements_to_write = std::min(*num_elements, buffer_num_elements);
memcpy(buffer, elements, num_elements_to_write * element_size_);
rv = ProducerEndWriteDataImplNoLock(num_elements_to_write);
if (rv != MOJO_RESULT_OK)
return rv;
*num_elements = num_elements_to_write;
return MOJO_RESULT_OK;
}
MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
uint32_t* buffer_num_elements,
MojoWriteDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
if (producer_in_two_phase_write_)
return MOJO_RESULT_BUSY;
MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer,
buffer_num_elements,
flags);
if (rv != MOJO_RESULT_OK)
return rv;
producer_in_two_phase_write_ = true;
return MOJO_RESULT_OK;
}
MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
if (!producer_in_two_phase_write_)
return MOJO_RESULT_FAILED_PRECONDITION;
MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written);
producer_in_two_phase_write_ = false; // End two-phase write even on failure.
return rv;
}
MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
if ((flags & ProducerSatisfiedFlagsNoLock()))
return MOJO_RESULT_ALREADY_EXISTS;
if (!(flags & ProducerSatisfiableFlagsNoLock()))
return MOJO_RESULT_FAILED_PRECONDITION;
producer_waiter_list_->AddWaiter(waiter, flags, wake_result);
return MOJO_RESULT_OK;
}
void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
producer_waiter_list_->RemoveWaiter(waiter);
}
void DataPipe::ConsumerCancelAllWaiters() {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
consumer_waiter_list_->CancelAllWaiters();
}
void DataPipe::ConsumerClose() {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
consumer_waiter_list_.reset();
ConsumerCloseImplNoLock();
}
MojoResult DataPipe::ConsumerReadData(void* elements,
uint32_t* num_elements,
MojoReadDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
if (consumer_in_two_phase_read_)
return MOJO_RESULT_BUSY;
if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
return ConsumerDiscardDataNoLock(num_elements,
(flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
}
if ((flags & MOJO_READ_DATA_FLAG_QUERY))
return ConsumerQueryDataNoLock(num_elements);
// TODO(vtl): This implementation may write less than requested, even if room
// is available. Fix this. (Probably make a subclass-specific impl.)
const void* buffer = NULL;
uint32_t buffer_num_elements = 0;
MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer,
&buffer_num_elements,
flags);
if (rv != MOJO_RESULT_OK)
return rv;
uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements);
memcpy(elements, buffer, num_elements_to_read * element_size_);
rv = ConsumerEndReadDataImplNoLock(num_elements_to_read);
if (rv != MOJO_RESULT_OK)
return rv;
*num_elements = num_elements_to_read;
return MOJO_RESULT_OK;
}
MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
uint32_t* buffer_num_elements,
MojoReadDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
if (consumer_in_two_phase_read_)
return MOJO_RESULT_BUSY;
MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer,
buffer_num_elements,
flags);
if (rv != MOJO_RESULT_OK)
return rv;
consumer_in_two_phase_read_ = true;
return MOJO_RESULT_OK;
}
MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
if (!consumer_in_two_phase_read_)
return MOJO_RESULT_FAILED_PRECONDITION;
MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read);
consumer_in_two_phase_read_ = false; // End two-phase read even on failure.
return rv;
}
MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
if ((flags & ConsumerSatisfiedFlagsNoLock()))
return MOJO_RESULT_ALREADY_EXISTS;
if (!(flags & ConsumerSatisfiableFlagsNoLock()))
return MOJO_RESULT_FAILED_PRECONDITION;
consumer_waiter_list_->AddWaiter(waiter, flags, wake_result);
return MOJO_RESULT_OK;
}
void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
consumer_waiter_list_->RemoveWaiter(waiter);
}
DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer)
: element_size_(0),
producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
producer_in_two_phase_write_(false),
consumer_in_two_phase_read_(false) {
DCHECK(has_local_producer || has_local_consumer);
}
DataPipe::~DataPipe() {
DCHECK(!has_local_producer_no_lock());
DCHECK(!has_local_consumer_no_lock());
}
MojoResult DataPipe::Init(bool may_discard,
size_t element_size,
size_t capacity_num_elements) {
// No need to lock: This method is not thread-safe.
if (element_size == 0)
return MOJO_RESULT_INVALID_ARGUMENT;
if (!capacity_num_elements) {
// Set the capacity to the default (rounded down by element size, but always
// at least one element).
capacity_num_elements =
std::max(static_cast<size_t>(1),
kDefaultDataPipeCapacityBytes / element_size);
}
if (capacity_num_elements >
std::numeric_limits<uint32_t>::max() / element_size)
return MOJO_RESULT_INVALID_ARGUMENT;
if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
may_discard_ = may_discard;
element_size_ = element_size;
capacity_num_elements_ = capacity_num_elements;
return MOJO_RESULT_OK;
}
void DataPipe::AwakeProducerWaitersForStateChangeNoLock() {
lock_.AssertAcquired();
if (!has_local_producer_no_lock())
return;
producer_waiter_list_->AwakeWaitersForStateChange(
ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock());
}
void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
lock_.AssertAcquired();
if (!has_local_consumer_no_lock())
return;
consumer_waiter_list_->AwakeWaitersForStateChange(
ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
}
} // namespace system
} // namespace mojo