/*
* Copyright (C) 2016 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <stdint.h>
#include <unistd.h>
#include <time.h>
#include "common/libs/time/monotonic_time.h"
/**
* This abstract class simulates a buffer that either fills or empties at
* a specified rate.
*
* The simulated buffer automatically fills or empties at a specific rate.
*
* An item is the thing contained in the simulated buffer. Items are moved
* in and out of the buffer without subdivision.
*
* An integral number of items must arrive / depart in each second.
* This number is stored in items_per_second_
*
* items_per_second * 2000000000 must fit within an int64_t. This
* works if items_per_second is represented by an int32.
*
* The base class does have the concept of capacity, but doesn't use it.
* It is included here to simplify unit testing.
*
* For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below.
*/
class SimulatedBufferBase {
public:
static inline int64_t divide_and_round_up(int64_t q, int64_t d) {
return q / d + ((q % d) != 0);
}
SimulatedBufferBase(
int32_t items_per_second,
int64_t simulated_item_capacity,
cvd::time::MonotonicTimePointFactory* clock =
cvd::time::MonotonicTimePointFactory::GetInstance()) :
clock_(clock),
current_item_num_(0),
base_item_num_(0),
simulated_item_capacity_(simulated_item_capacity),
items_per_second_(items_per_second),
initialize_(true),
paused_(false) { }
virtual ~SimulatedBufferBase() { }
int64_t GetCurrentItemNum() {
Update();
return current_item_num_;
}
const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const {
return current_time_;
}
// Sleep for the given amount of time. Subclasses may override this to use
// different sleep calls.
// Sleep is best-effort. The code assumes that the acutal sleep time may be
// greater or less than the time requested.
virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) {
struct timespec ts;
in.ToTimespec(&ts);
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL);
}
// The time counter may not start at 0. Concrete classes should call this
// to allow the buffer simulation to read the current time number and
// initialize its internal state.
virtual void Init() {
if (initialize_) {
clock_->FetchCurrentTime(&base_time_);
current_time_ = base_time_;
initialize_ = false;
}
}
virtual void Update() {
if (initialize_) {
Init();
}
cvd::time::MonotonicTimePoint now;
clock_->FetchCurrentTime(&now);
// We can't call FetchCurrentTime() in the constuctor because a subclass may
// want to override it, so we initialze the times to 0. If we detect this
// case go ahead and initialize to a current timestamp.
if (paused_) {
base_time_ += now - current_time_;
current_time_ = now;
return;
}
// Avoid potential overflow by limiting the scaling to one time second.
// There is no round-off error here because the bases are adjusted for full
// seconds.
// There is no issue with int64 overflow because 2's compliment subtraction
// is immune to overflow.
// However, this does assume that kNanosecondsPerSecond * items_per_second_
// fits in an int64.
cvd::time::Seconds seconds(now - base_time_);
base_time_ += seconds;
base_item_num_ += seconds.count() * items_per_second_;
current_time_ = now;
current_item_num_ =
cvd::time::Nanoseconds(now - base_time_).count() *
items_per_second_ / cvd::time::kNanosecondsPerSecond +
base_item_num_;
}
// If set to true new items will not be created.
bool SetPaused(bool new_state) {
bool rval = paused_;
Update();
paused_ = new_state;
return rval;
}
// Calculate the TimePoint that corresponds to an item.
// Caution: This may not return a correct time for items in the past.
cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) {
int64_t seconds = (item - base_item_num_) / items_per_second_;
int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_;
return base_time_ + cvd::time::Seconds(seconds) +
cvd::time::Nanoseconds(divide_and_round_up(
(item - new_base_item_num) *
cvd::time::kNanosecondsPerSecond,
items_per_second_));
}
// Sleep until the given item number is generated. If the generator is
// paused unpause it to make the sleep finite.
void SleepUntilItem(int64_t item) {
if (paused_) {
SetPaused(false);
}
cvd::time::MonotonicTimePoint desired_time =
CalculateItemTime(item);
while (1) {
Update();
if (current_item_num_ - item >= 0) {
return;
}
SleepUntilTime(desired_time);
}
}
protected:
// Source of the timepoints.
cvd::time::MonotonicTimePointFactory* clock_;
// Time when the other values in the structure were updated.
cvd::time::MonotonicTimePoint current_time_;
// Most recent time when there was no round-off error between the clock and
// items.
cvd::time::MonotonicTimePoint base_time_;
// Number of the current item.
int64_t current_item_num_;
// Most recent item number where there was no round-off error between the
// clock and items.
int64_t base_item_num_;
// Simulated_Item_Capacity of the buffer in items.
int64_t simulated_item_capacity_;
// Number of items that are created in 1s. A typical number would be 48000.
int32_t items_per_second_;
bool initialize_;
// If true then don't generate new items.
bool paused_;
};
/**
* This is a simulation of an output buffer that drains at a constant rate.
*/
class SimulatedOutputBuffer : public SimulatedBufferBase {
public:
SimulatedOutputBuffer(
int64_t item_rate,
int64_t simulated_item_capacity,
cvd::time::MonotonicTimePointFactory* clock =
cvd::time::MonotonicTimePointFactory::GetInstance()) :
SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
output_buffer_item_num_ = current_item_num_;
}
void Update() override {
SimulatedBufferBase::Update();
if ((output_buffer_item_num_ - current_item_num_) < 0) {
// We ran out of items at some point in the past. However, the
// output capactiy can't be negative.
output_buffer_item_num_ = current_item_num_;
}
}
int64_t AddToOutputBuffer(int64_t num_new_items, bool block) {
Update();
// The easy case: num_new_items fit in the bucket.
if ((output_buffer_item_num_ + num_new_items - current_item_num_) <=
simulated_item_capacity_) {
output_buffer_item_num_ += num_new_items;
return num_new_items;
}
// If we're non-blocking accept enough items to fill the output.
if (!block) {
int64_t used = current_item_num_ + simulated_item_capacity_ -
output_buffer_item_num_;
output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_;
return used;
}
int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items;
SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_);
output_buffer_item_num_ = new_output_buffer_item_num;
return num_new_items;
}
int64_t GetNextOutputBufferItemNum() {
Update();
return output_buffer_item_num_;
}
cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() {
Update();
return CalculateItemTime(output_buffer_item_num_);
}
int64_t GetOutputBufferSize() {
Update();
return output_buffer_item_num_ - current_item_num_;
}
void Drain() {
SleepUntilItem(output_buffer_item_num_);
}
protected:
int64_t output_buffer_item_num_;
};
/**
* Simulates an input buffer that fills at a constant rate.
*/
class SimulatedInputBuffer : public SimulatedBufferBase {
public:
SimulatedInputBuffer(
int64_t item_rate,
int64_t simulated_item_capacity,
cvd::time::MonotonicTimePointFactory* clock =
cvd::time::MonotonicTimePointFactory::GetInstance()) :
SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
input_buffer_item_num_ = current_item_num_;
lost_input_items_ = 0;
}
void Update() override {
SimulatedBufferBase::Update();
if ((current_item_num_ - input_buffer_item_num_) >
simulated_item_capacity_) {
// The buffer overflowed at some point in the past. Account for the lost
// times.
int64_t new_input_buffer_item_num =
current_item_num_ - simulated_item_capacity_;
lost_input_items_ +=
new_input_buffer_item_num - input_buffer_item_num_;
input_buffer_item_num_ = new_input_buffer_item_num;
}
}
int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) {
Update();
if (!block) {
int64_t num_items_available = current_item_num_ - input_buffer_item_num_;
if (num_items_available < num_items_wanted) {
input_buffer_item_num_ += num_items_available;
return num_items_available;
} else {
input_buffer_item_num_ += num_items_wanted;
return num_items_wanted;
}
}
// Calculate the item number that is being claimed. Sleep until it appears.
// Advancing input_buffer_item_num_ causes a negative value to be compared
// to the capacity, effectively disabling the overflow detection code
// in Update().
input_buffer_item_num_ += num_items_wanted;
while (input_buffer_item_num_ - current_item_num_ > 0) {
SleepUntilItem(input_buffer_item_num_);
}
return num_items_wanted;
}
int64_t GetLostInputItems() {
Update();
int64_t rval = lost_input_items_;
lost_input_items_ = 0;
return rval;
}
protected:
int64_t input_buffer_item_num_;
int64_t lost_input_items_;
};