C++程序  |  315行  |  10.16 KB

/*
 * Copyright (C) 2016 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once

#include <stdint.h>
#include <unistd.h>
#include <time.h>

#include "common/libs/time/monotonic_time.h"

/**
 * This abstract class simulates a buffer that either fills or empties at
 * a specified rate.
 *
 * The simulated buffer automatically fills or empties at a specific rate.
 *
 * An item is the thing contained in the simulated buffer. Items are moved
 * in and out of the buffer without subdivision.
 *
 * An integral number of items must arrive / depart in each second.
 * This number is stored in items_per_second_
 *
 * items_per_second * 2000000000 must fit within an int64_t. This
 * works if items_per_second is represented by an int32.
 *
 * The base class does have the concept of capacity, but doesn't use it.
 * It is included here to simplify unit testing.
 *
 * For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below.
 */
class SimulatedBufferBase {
 public:
  static inline int64_t divide_and_round_up(int64_t q, int64_t d) {
    return q / d + ((q % d) != 0);
  }

  SimulatedBufferBase(
      int32_t items_per_second,
      int64_t simulated_item_capacity,
      cvd::time::MonotonicTimePointFactory* clock =
        cvd::time::MonotonicTimePointFactory::GetInstance()) :
    clock_(clock),
    current_item_num_(0),
    base_item_num_(0),
    simulated_item_capacity_(simulated_item_capacity),
    items_per_second_(items_per_second),
    initialize_(true),
    paused_(false) { }

  virtual ~SimulatedBufferBase() { }

  int64_t GetCurrentItemNum() {
    Update();
    return current_item_num_;
  }

  const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const {
    return current_time_;
  }

  // Sleep for the given amount of time. Subclasses may override this to use
  // different sleep calls.
  // Sleep is best-effort. The code assumes that the acutal sleep time may be
  // greater or less than the time requested.
  virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) {
    struct timespec ts;
    in.ToTimespec(&ts);
    clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL);
  }

  // The time counter may not start at 0. Concrete classes should call this
  // to allow the buffer simulation to read the current time number and
  // initialize its internal state.
  virtual void Init() {
    if (initialize_) {
      clock_->FetchCurrentTime(&base_time_);
      current_time_ = base_time_;
      initialize_ = false;
    }
  }

  virtual void Update() {
    if (initialize_) {
      Init();
    }
    cvd::time::MonotonicTimePoint now;
    clock_->FetchCurrentTime(&now);
    // We can't call FetchCurrentTime() in the constuctor because a subclass may
    // want to override it, so we initialze the times to 0. If we detect this
    // case go ahead and initialize to a current timestamp.
    if (paused_) {
      base_time_ += now - current_time_;
      current_time_ = now;
      return;
    }
    // Avoid potential overflow by limiting the scaling to one time second.
    // There is no round-off error here because the bases are adjusted for full
    // seconds.
    // There is no issue with int64 overflow because 2's compliment subtraction
    // is immune to overflow.
    // However, this does assume that kNanosecondsPerSecond * items_per_second_
    // fits in an int64.
    cvd::time::Seconds seconds(now - base_time_);
    base_time_ += seconds;
    base_item_num_ += seconds.count() * items_per_second_;
    current_time_ = now;
    current_item_num_ =
        cvd::time::Nanoseconds(now - base_time_).count() *
        items_per_second_ / cvd::time::kNanosecondsPerSecond +
        base_item_num_;
  }

  // If set to true new items will not be created.
  bool SetPaused(bool new_state) {
    bool rval = paused_;
    Update();
    paused_ = new_state;
    return rval;
  }

  // Calculate the TimePoint that corresponds to an item.
  // Caution: This may not return a correct time for items in the past.
  cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) {
    int64_t seconds = (item - base_item_num_) / items_per_second_;
    int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_;
    return base_time_ + cvd::time::Seconds(seconds) +
      cvd::time::Nanoseconds(divide_and_round_up(
          (item - new_base_item_num) *
          cvd::time::kNanosecondsPerSecond,
          items_per_second_));
  }

  // Sleep until the given item number is generated. If the generator is
  // paused unpause it to make the sleep finite.
  void SleepUntilItem(int64_t item) {
    if (paused_) {
      SetPaused(false);
    }
    cvd::time::MonotonicTimePoint desired_time =
        CalculateItemTime(item);
    while (1) {
      Update();
      if (current_item_num_ - item >= 0) {
        return;
      }
      SleepUntilTime(desired_time);
    }
  }

 protected:
  // Source of the timepoints.
  cvd::time::MonotonicTimePointFactory* clock_;
  // Time when the other values in the structure were updated.
  cvd::time::MonotonicTimePoint current_time_;
  // Most recent time when there was no round-off error between the clock and
  // items.
  cvd::time::MonotonicTimePoint base_time_;
  // Number of the current item.
  int64_t current_item_num_;
  // Most recent item number where there was no round-off error between the
  // clock and items.
  int64_t base_item_num_;
  // Simulated_Item_Capacity of the buffer in items.
  int64_t simulated_item_capacity_;
  // Number of items that are created in 1s. A typical number would be 48000.
  int32_t items_per_second_;
  bool initialize_;
  // If true then don't generate new items.
  bool paused_;
};

/**
 * This is a simulation of an output buffer that drains at a constant rate.
 */
class SimulatedOutputBuffer : public SimulatedBufferBase {
 public:
  SimulatedOutputBuffer(
      int64_t item_rate,
      int64_t simulated_item_capacity,
      cvd::time::MonotonicTimePointFactory* clock =
        cvd::time::MonotonicTimePointFactory::GetInstance()) :
      SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
    output_buffer_item_num_ = current_item_num_;
  }

  void Update() override {
    SimulatedBufferBase::Update();
    if ((output_buffer_item_num_ - current_item_num_) < 0) {
      // We ran out of items at some point in the past. However, the
      // output capactiy can't be negative.
      output_buffer_item_num_ = current_item_num_;
    }
  }

  int64_t AddToOutputBuffer(int64_t num_new_items, bool block) {
    Update();
    // The easy case: num_new_items fit in the bucket.
    if ((output_buffer_item_num_ + num_new_items - current_item_num_) <=
        simulated_item_capacity_) {
      output_buffer_item_num_ += num_new_items;
      return num_new_items;
    }
    // If we're non-blocking accept enough items to fill the output.
    if (!block) {
      int64_t used = current_item_num_ + simulated_item_capacity_ -
          output_buffer_item_num_;
      output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_;
      return used;
    }
    int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items;
    SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_);
    output_buffer_item_num_ = new_output_buffer_item_num;
    return num_new_items;
  }

  int64_t GetNextOutputBufferItemNum() {
    Update();
    return output_buffer_item_num_;
  }

  cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() {
    Update();
    return CalculateItemTime(output_buffer_item_num_);
  }

  int64_t GetOutputBufferSize() {
    Update();
    return output_buffer_item_num_ - current_item_num_;
  }

  void Drain() {
    SleepUntilItem(output_buffer_item_num_);
  }

 protected:
  int64_t output_buffer_item_num_;
};

/**
 * Simulates an input buffer that fills at a constant rate.
 */
class SimulatedInputBuffer : public SimulatedBufferBase {
 public:
  SimulatedInputBuffer(
      int64_t item_rate,
      int64_t simulated_item_capacity,
      cvd::time::MonotonicTimePointFactory* clock =
        cvd::time::MonotonicTimePointFactory::GetInstance()) :
      SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
    input_buffer_item_num_ = current_item_num_;
    lost_input_items_ = 0;
  }

  void Update() override {
    SimulatedBufferBase::Update();
    if ((current_item_num_ - input_buffer_item_num_) >
        simulated_item_capacity_) {
      // The buffer overflowed at some point in the past. Account for the lost
      // times.
      int64_t new_input_buffer_item_num =
          current_item_num_ - simulated_item_capacity_;
      lost_input_items_ +=
          new_input_buffer_item_num - input_buffer_item_num_;
      input_buffer_item_num_ = new_input_buffer_item_num;
    }
  }

  int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) {
    Update();
    if (!block) {
      int64_t num_items_available = current_item_num_ - input_buffer_item_num_;
      if (num_items_available < num_items_wanted) {
        input_buffer_item_num_ += num_items_available;
        return num_items_available;
      } else {
        input_buffer_item_num_ += num_items_wanted;
        return num_items_wanted;
      }
    }
    // Calculate the item number that is being claimed. Sleep until it appears.
    // Advancing input_buffer_item_num_ causes a negative value to be compared
    // to the capacity, effectively disabling the overflow detection code
    // in Update().
    input_buffer_item_num_ += num_items_wanted;
    while (input_buffer_item_num_ - current_item_num_ > 0) {
      SleepUntilItem(input_buffer_item_num_);
    }
    return num_items_wanted;
  }

  int64_t GetLostInputItems() {
    Update();
    int64_t rval = lost_input_items_;
    lost_input_items_ = 0;
    return rval;
  }

 protected:
  int64_t input_buffer_item_num_;
  int64_t lost_input_items_;
};