// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "remoting/host/linux/audio_pipe_reader.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/stl_util.h"

namespace remoting {

namespace {

const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
                                  AudioPipeReader::kChannels *
                                  AudioPipeReader::kBytesPerSample;

// Read data from the pipe every 40ms.
const int kCapturingPeriodMs = 40;

// Size of the pipe buffer in milliseconds.
const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;

// Size of the pipe buffer in bytes.
const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
    base::Time::kMillisecondsPerSecond;

#if !defined(F_SETPIPE_SZ)
// F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
// to compile this code on machines with older kernel.
#define F_SETPIPE_SZ 1031
#endif  // defined(F_SETPIPE_SZ)

}  // namespace

// static
scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
    const base::FilePath& pipe_path) {
  // Create a reference to the new AudioPipeReader before posting the
  // StartOnAudioThread task, otherwise it may be deleted on the audio
  // thread before we return.
  scoped_refptr<AudioPipeReader> pipe_reader =
      new AudioPipeReader(task_runner, pipe_path);
  task_runner->PostTask(
      FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
  return pipe_reader;
}

AudioPipeReader::AudioPipeReader(
    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
    const base::FilePath& pipe_path)
    : task_runner_(task_runner),
      pipe_path_(pipe_path),
      observers_(new ObserverListThreadSafe<StreamObserver>()) {
}

AudioPipeReader::~AudioPipeReader() {}

void AudioPipeReader::AddObserver(StreamObserver* observer) {
  observers_->AddObserver(observer);
}
void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
  observers_->RemoveObserver(observer);
}

void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
  DCHECK_EQ(fd, pipe_.GetPlatformFile());
  StartTimer();
}

void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
  NOTREACHED();
}

void AudioPipeReader::StartOnAudioThread() {
  DCHECK(task_runner_->BelongsToCurrentThread());

  if (!file_watcher_.Watch(pipe_path_.DirName(), true,
                           base::Bind(&AudioPipeReader::OnDirectoryChanged,
                                      base::Unretained(this)))) {
    LOG(ERROR) << "Failed to watch pulseaudio directory "
               << pipe_path_.DirName().value();
  }

  TryOpenPipe();
}

void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
                                         bool error) {
  DCHECK(task_runner_->BelongsToCurrentThread());

  if (error) {
    LOG(ERROR) << "File watcher returned an error.";
    return;
  }

  TryOpenPipe();
}

void AudioPipeReader::TryOpenPipe() {
  DCHECK(task_runner_->BelongsToCurrentThread());

  base::File new_pipe;
  new_pipe.Initialize(
      pipe_path_,
      base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC);

  // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
  // file descriptors. Don't need to do anything if inode hasn't changed.
  if (new_pipe.IsValid() && pipe_.IsValid()) {
    struct stat old_stat;
    struct stat new_stat;
    if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
        fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
        old_stat.st_ino == new_stat.st_ino) {
      return;
    }
  }

  file_descriptor_watcher_.StopWatchingFileDescriptor();
  timer_.Stop();

  pipe_ = new_pipe.Pass();

  if (pipe_.IsValid()) {
    // Set O_NONBLOCK flag.
    if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) {
      PLOG(ERROR) << "fcntl";
      pipe_.Close();
      return;
    }

    // Set buffer size for the pipe.
    if (HANDLE_EINTR(fcntl(
            pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) {
      PLOG(ERROR) << "fcntl";
    }

    WaitForPipeReadable();
  }
}

void AudioPipeReader::StartTimer() {
  DCHECK(task_runner_->BelongsToCurrentThread());
  started_time_ = base::TimeTicks::Now();
  last_capture_position_ = 0;
  timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
               this, &AudioPipeReader::DoCapture);
}

void AudioPipeReader::DoCapture() {
  DCHECK(task_runner_->BelongsToCurrentThread());
  DCHECK(pipe_.IsValid());

  // Calculate how much we need read from the pipe. Pulseaudio doesn't control
  // how much data it writes to the pipe, so we need to pace the stream.
  base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
  int64 stream_position_bytes = stream_position.InMilliseconds() *
      kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
  int64 bytes_to_read = stream_position_bytes - last_capture_position_;

  std::string data = left_over_bytes_;
  size_t pos = data.size();
  left_over_bytes_.clear();
  data.resize(pos + bytes_to_read);

  while (pos < data.size()) {
    int read_result =
        pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
    if (read_result > 0) {
      pos += read_result;
    } else {
      if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
        PLOG(ERROR) << "read";
      break;
    }
  }

  // Stop reading from the pipe if PulseAudio isn't writing anything.
  if (pos == 0) {
    WaitForPipeReadable();
    return;
  }

  // Save any incomplete samples we've read for later. Each packet should
  // contain integer number of samples.
  int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
  left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
                          incomplete_samples_bytes);
  data.resize(pos - incomplete_samples_bytes);

  last_capture_position_ += data.size();
  // Normally PulseAudio will keep pipe buffer full, so we should always be able
  // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
  // sure that |stream_position_bytes| doesn't go out of sync with the current
  // stream position.
  if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
    last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
  DCHECK_LE(last_capture_position_, stream_position_bytes);

  // Dispatch asynchronous notification to the stream observers.
  scoped_refptr<base::RefCountedString> data_ref =
      base::RefCountedString::TakeString(&data);
  observers_->Notify(&StreamObserver::OnDataRead, data_ref);
}

void AudioPipeReader::WaitForPipeReadable() {
  timer_.Stop();
  base::MessageLoopForIO::current()->WatchFileDescriptor(
      pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
      &file_descriptor_watcher_, this);
}

// static
void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
  audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
}

}  // namespace remoting