普通文本  |  878行  |  32.26 KB

// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/sync/engine/syncer_thread.h"

#include <algorithm>

#include "base/rand_util.h"
#include "chrome/browser/sync/engine/syncer.h"

using base::TimeDelta;
using base::TimeTicks;

namespace browser_sync {

using sessions::SyncSession;
using sessions::SyncSessionSnapshot;
using sessions::SyncSourceInfo;
using syncable::ModelTypePayloadMap;
using syncable::ModelTypeBitSet;
using sync_pb::GetUpdatesCallerInfo;

SyncerThread::DelayProvider::DelayProvider() {}
SyncerThread::DelayProvider::~DelayProvider() {}

SyncerThread::WaitInterval::WaitInterval() {}
SyncerThread::WaitInterval::~WaitInterval() {}

SyncerThread::SyncSessionJob::SyncSessionJob() {}
SyncerThread::SyncSessionJob::~SyncSessionJob() {}

SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
    base::TimeTicks start,
    linked_ptr<sessions::SyncSession> session, bool is_canary_job,
    const tracked_objects::Location& nudge_location) : purpose(purpose),
        scheduled_start(start),
        session(session),
        is_canary_job(is_canary_job),
        nudge_location(nudge_location) {
}

TimeDelta SyncerThread::DelayProvider::GetDelay(
    const base::TimeDelta& last_delay) {
  return SyncerThread::GetRecommendedDelay(last_delay);
}

GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
    NudgeSource source) {
  switch (source) {
    case NUDGE_SOURCE_NOTIFICATION:
      return GetUpdatesCallerInfo::NOTIFICATION;
    case NUDGE_SOURCE_LOCAL:
      return GetUpdatesCallerInfo::LOCAL;
    case NUDGE_SOURCE_CONTINUATION:
      return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
    case NUDGE_SOURCE_UNKNOWN:
      return GetUpdatesCallerInfo::UNKNOWN;
    default:
      NOTREACHED();
      return GetUpdatesCallerInfo::UNKNOWN;
  }
}

SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
    : mode(mode), had_nudge(false), length(length) { }

SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
                           Syncer* syncer)
    : thread_("SyncEngine_SyncerThread"),
      syncer_short_poll_interval_seconds_(
          TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
      syncer_long_poll_interval_seconds_(
          TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
      mode_(NORMAL_MODE),
      server_connection_ok_(false),
      delay_provider_(new DelayProvider()),
      syncer_(syncer),
      session_context_(context) {
}

SyncerThread::~SyncerThread() {
  DCHECK(!thread_.IsRunning());
}

void SyncerThread::CheckServerConnectionManagerStatus(
    HttpResponse::ServerConnectionCode code) {

  VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
          << "Old mode: " << server_connection_ok_ << " Code: " << code;
  // Note, be careful when adding cases here because if the SyncerThread
  // thinks there is no valid connection as determined by this method, it
  // will drop out of *all* forward progress sync loops (it won't poll and it
  // will queue up Talk notifications but not actually call SyncShare) until
  // some external action causes a ServerConnectionManager to broadcast that
  // a valid connection has been re-established.
  if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
      HttpResponse::SYNC_AUTH_ERROR == code) {
    server_connection_ok_ = false;
    VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
            << " new mode:" << server_connection_ok_;
  } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
    server_connection_ok_ = true;
    VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
            << " new mode:" << server_connection_ok_;
    DoCanaryJob();
  }
}

void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
  VLOG(1) << "SyncerThread(" << this << ")" << "  Start called from thread "
          << MessageLoop::current()->thread_name();
  if (!thread_.IsRunning()) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode "
            << mode;
    if (!thread_.Start()) {
      NOTREACHED() << "Unable to start SyncerThread.";
      return;
    }
    WatchConnectionManager();
    thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
        this, &SyncerThread::SendInitialSnapshot));
  }

  VLOG(1) << "SyncerThread(" << this << ")" << "  Entering start with mode = "
          << mode;

  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
      this, &SyncerThread::StartImpl, mode, callback));
}

void SyncerThread::SendInitialSnapshot() {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
      SyncSourceInfo(), ModelSafeRoutingInfo(),
      std::vector<ModelSafeWorker*>()));
  SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
  sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
  event.snapshot = &snapshot;
  session_context_->NotifyListeners(event);
}

void SyncerThread::WatchConnectionManager() {
  ServerConnectionManager* scm = session_context_->connection_manager();
  CheckServerConnectionManagerStatus(scm->server_status());
  scm->AddListener(this);
}

void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) {
  VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
          << mode;

  // TODO(lipalani): This will leak if startimpl is never run. Fix it using a
  // ThreadSafeRefcounted object.
  scoped_ptr<ModeChangeCallback> scoped_callback(callback);
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  DCHECK(!session_context_->account_name().empty());
  DCHECK(syncer_.get());
  mode_ = mode;
  AdjustPolling(NULL);  // Will kick start poll timer if needed.
  if (scoped_callback.get())
    scoped_callback->Run();

  // We just changed our mode. See if there are any pending jobs that we could
  // execute in the new mode.
  DoPendingJobIfPossible(false);
}

SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
    const SyncSessionJob& job) {

  DCHECK(wait_interval_.get());
  DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);

  VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : "
          << wait_interval_->mode << "Wait interval had nudge : "
          << wait_interval_->had_nudge << "is canary job : "
          << job.is_canary_job;

  if (job.purpose == SyncSessionJob::POLL)
    return DROP;

  DCHECK(job.purpose == SyncSessionJob::NUDGE ||
      job.purpose == SyncSessionJob::CONFIGURATION);
  if (wait_interval_->mode == WaitInterval::THROTTLED)
    return SAVE;

  DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
  if (job.purpose == SyncSessionJob::NUDGE) {
    if (mode_ == CONFIGURATION_MODE)
      return SAVE;

    // If we already had one nudge then just drop this nudge. We will retry
    // later when the timer runs out.
    return wait_interval_->had_nudge ? DROP : CONTINUE;
  }
  // This is a config job.
  return job.is_canary_job ? CONTINUE : SAVE;
}

SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
    const SyncSessionJob& job) {
  if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
    return CONTINUE;

  if (wait_interval_.get())
    return DecideWhileInWaitInterval(job);

  if (mode_ == CONFIGURATION_MODE) {
    if (job.purpose == SyncSessionJob::NUDGE)
      return SAVE;
    else if (job.purpose == SyncSessionJob::CONFIGURATION)
      return CONTINUE;
    else
      return DROP;
  }

  // We are in normal mode.
  DCHECK_EQ(mode_, NORMAL_MODE);
  DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);

  // Freshness condition
  if (job.scheduled_start < last_sync_session_end_time_) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " Dropping job because of freshness";
    return DROP;
  }

  if (server_connection_ok_)
    return CONTINUE;

  VLOG(1) << "SyncerThread(" << this << ")"
          << " Bad server connection. Using that to decide on job.";
  return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
}

void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
  DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
  if (pending_nudge_.get() == NULL) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " Creating a pending nudge job";
    SyncSession* s = job.session.get();
    scoped_ptr<SyncSession> session(new SyncSession(s->context(),
        s->delegate(), s->source(), s->routing_info(), s->workers()));

    SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
        make_linked_ptr(session.release()), false, job.nudge_location);
    pending_nudge_.reset(new SyncSessionJob(new_job));

    return;
  }

  VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
  pending_nudge_->session->Coalesce(*(job.session.get()));
  pending_nudge_->scheduled_start = job.scheduled_start;

  // Unfortunately the nudge location cannot be modified. So it stores the
  // location of the first caller.
}

bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
  JobProcessDecision decision = DecideOnJob(job);
  VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: "
          << decision << " Job purpose " << job.purpose << "mode " << mode_;
  if (decision != SAVE)
    return decision == CONTINUE;

  DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
      SyncSessionJob::CONFIGURATION);

  SaveJob(job);
  return false;
}

void SyncerThread::SaveJob(const SyncSessionJob& job) {
  DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
  if (job.purpose == SyncSessionJob::NUDGE) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job";
    InitOrCoalescePendingJob(job);
  } else if (job.purpose == SyncSessionJob::CONFIGURATION){
    VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job";
    DCHECK(wait_interval_.get());
    DCHECK(mode_ == CONFIGURATION_MODE);

    SyncSession* old = job.session.get();
    SyncSession* s(new SyncSession(session_context_.get(), this,
        old->source(), old->routing_info(), old->workers()));
    SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
                          make_linked_ptr(s), false, job.nudge_location);
    wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
  } // drop the rest.
}

// Functor for std::find_if to search by ModelSafeGroup.
struct ModelSafeWorkerGroupIs {
  explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
  bool operator()(ModelSafeWorker* w) {
    return group == w->GetModelSafeGroup();
  }
  ModelSafeGroup group;
};

void SyncerThread::ScheduleClearUserData() {
  if (!thread_.IsRunning()) {
    NOTREACHED();
    return;
  }
  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
      this, &SyncerThread::ScheduleClearUserDataImpl));
}

void SyncerThread::ScheduleNudge(const TimeDelta& delay,
    NudgeSource source, const ModelTypeBitSet& types,
    const tracked_objects::Location& nudge_location) {
  if (!thread_.IsRunning()) {
    NOTREACHED();
    return;
  }

  VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled";

  ModelTypePayloadMap types_with_payloads =
      syncable::ModelTypePayloadMapFromBitSet(types, std::string());
  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
      this, &SyncerThread::ScheduleNudgeImpl, delay,
      GetUpdatesFromNudgeSource(source), types_with_payloads, false,
      nudge_location));
}

void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
    NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
    const tracked_objects::Location& nudge_location) {
  if (!thread_.IsRunning()) {
    NOTREACHED();
    return;
  }

  VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";

  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
      this, &SyncerThread::ScheduleNudgeImpl, delay,
      GetUpdatesFromNudgeSource(source), types_with_payloads, false,
      nudge_location));
}

void SyncerThread::ScheduleClearUserDataImpl() {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  SyncSession* session = new SyncSession(session_context_.get(), this,
      SyncSourceInfo(), ModelSafeRoutingInfo(),
      std::vector<ModelSafeWorker*>());
  ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
      SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
}

void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
    GetUpdatesCallerInfo::GetUpdatesSource source,
    const ModelTypePayloadMap& types_with_payloads,
    bool is_canary_job, const tracked_objects::Location& nudge_location) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());

  VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
  // Note we currently nudge for all types regardless of the ones incurring
  // the nudge.  Doing different would throw off some syncer commands like
  // CleanupDisabledTypes.  We may want to change this in the future.
  SyncSourceInfo info(source, types_with_payloads);

  SyncSession* session(CreateSyncSession(info));
  SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
                     make_linked_ptr(session), is_canary_job,
                     nudge_location);

  session = NULL;
  if (!ShouldRunJob(job))
    return;

  if (pending_nudge_.get()) {
    if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
      VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
              << "we are in backoff";
      return;
    }

    VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
    pending_nudge_->session->Coalesce(*(job.session.get()));

    if (!IsBackingOff()) {
      VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
              << " we are not in backoff and the job was coalesced";
      return;
    } else {
      VLOG(1) << "SyncerThread(" << this << ")"
              << " Rescheduling pending nudge";
      SyncSession* s = pending_nudge_->session.get();
      job.session.reset(new SyncSession(s->context(), s->delegate(),
          s->source(), s->routing_info(), s->workers()));
      pending_nudge_.reset();
    }
  }

  // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
  ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
      nudge_location);
}

// Helper to extract the routing info and workers corresponding to types in
// |types| from |registrar|.
void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
    ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
    std::vector<ModelSafeWorker*>* workers) {
  ModelSafeRoutingInfo r_tmp;
  std::vector<ModelSafeWorker*> w_tmp;
  registrar->GetModelSafeRoutingInfo(&r_tmp);
  registrar->GetWorkers(&w_tmp);

  bool passive_group_added = false;

  typedef std::vector<ModelSafeWorker*>::const_iterator iter;
  for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
    if (!types.test(i))
      continue;
    syncable::ModelType t = syncable::ModelTypeFromInt(i);
    DCHECK_EQ(1U, r_tmp.count(t));
    (*routes)[t] = r_tmp[t];
    iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
                           ModelSafeWorkerGroupIs(r_tmp[t]));
    if (it != w_tmp.end()) {
      iter it2 = std::find_if(workers->begin(), workers->end(),
                              ModelSafeWorkerGroupIs(r_tmp[t]));
      if (it2 == workers->end())
        workers->push_back(*it);

      if (r_tmp[t] == GROUP_PASSIVE)
        passive_group_added = true;
    } else {
        NOTREACHED();
    }
  }

  // Always add group passive.
  if (passive_group_added == false) {
    iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
                           ModelSafeWorkerGroupIs(GROUP_PASSIVE));
    if (it != w_tmp.end())
      workers->push_back(*it);
    else
      NOTREACHED();
  }
}

void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
  if (!thread_.IsRunning()) {
    NOTREACHED();
    return;
  }

  VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config";
  ModelSafeRoutingInfo routes;
  std::vector<ModelSafeWorker*> workers;
  GetModelSafeParamsForTypes(types, session_context_->registrar(),
                             &routes, &workers);

  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
      this, &SyncerThread::ScheduleConfigImpl, routes, workers,
      GetUpdatesCallerInfo::FIRST_UPDATE));
}

void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
    const std::vector<ModelSafeWorker*>& workers,
    const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());

  VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
  // TODO(tim): config-specific GetUpdatesCallerInfo value?
  SyncSession* session = new SyncSession(session_context_.get(), this,
      SyncSourceInfo(source,
          syncable::ModelTypePayloadMapFromRoutingInfo(
              routing_info, std::string())),
      routing_info, workers);
  ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
    SyncSessionJob::CONFIGURATION, session, FROM_HERE);
}

void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
    SyncSessionJob::SyncSessionJobPurpose purpose,
    sessions::SyncSession* session,
    const tracked_objects::Location& nudge_location) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());

  SyncSessionJob job(purpose, TimeTicks::Now() + delay,
                        make_linked_ptr(session), false, nudge_location);
  if (purpose == SyncSessionJob::NUDGE) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
            << " ScheduleSyncSessionJob";
    DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
    pending_nudge_.reset(new SyncSessionJob(job));
  }
  VLOG(1) << "SyncerThread(" << this << ")"
          << " Posting job to execute in DoSyncSessionJob. Job purpose "
          << job.purpose;
  MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
      &SyncerThread::DoSyncSessionJob, job),
      delay.InMilliseconds());
}

void SyncerThread::SetSyncerStepsForPurpose(
    SyncSessionJob::SyncSessionJobPurpose purpose,
    SyncerStep* start, SyncerStep* end) {
  *end = SYNCER_END;
  switch (purpose) {
    case SyncSessionJob::CONFIGURATION:
      *start = DOWNLOAD_UPDATES;
      *end = APPLY_UPDATES;
      return;
    case SyncSessionJob::CLEAR_USER_DATA:
      *start = CLEAR_PRIVATE_DATA;
       return;
    case SyncSessionJob::NUDGE:
    case SyncSessionJob::POLL:
      *start = SYNCER_BEGIN;
      return;
    default:
      NOTREACHED();
  }
}

void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  if (!ShouldRunJob(job)) {
    LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
        << job.session->source().updates_source;
    return;
  }

  if (job.purpose == SyncSessionJob::NUDGE) {
    if (pending_nudge_.get() == NULL || pending_nudge_->session != job.session)
      return;  // Another nudge must have been scheduled in in the meantime.
    pending_nudge_.reset();
  }
  VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
          << job.purpose;

  SyncerStep begin(SYNCER_BEGIN);
  SyncerStep end(SYNCER_END);
  SetSyncerStepsForPurpose(job.purpose, &begin, &end);

  bool has_more_to_sync = true;
  while (ShouldRunJob(job) && has_more_to_sync) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " SyncerThread: Calling SyncShare.";
    // Synchronously perform the sync session from this thread.
    syncer_->SyncShare(job.session.get(), begin, end);
    has_more_to_sync = job.session->HasMoreToSync();
    if (has_more_to_sync)
      job.session->ResetTransientState();
  }
  VLOG(1) << "SyncerThread(" << this << ")"
          << " SyncerThread: Done SyncShare looping.";
  FinishSyncSessionJob(job);
}

void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
  if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
    // Whatever types were part of a configuration task will have had updates
    // downloaded.  For that reason, we make sure they get recorded in the
    // event that they get disabled at a later time.
    ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
    if (!r.empty()) {
      ModelSafeRoutingInfo temp_r;
      ModelSafeRoutingInfo old_info(old_job.session->routing_info());
      std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
          std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
      session_context_->set_previous_session_routing_info(temp_r);
    }
  } else {
    session_context_->set_previous_session_routing_info(
        old_job.session->routing_info());
  }
}

void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  // Update timing information for how often datatypes are triggering nudges.
  base::TimeTicks now = TimeTicks::Now();
  if (!last_sync_session_end_time_.is_null()) {
    ModelTypePayloadMap::const_iterator iter;
    for (iter = job.session->source().types.begin();
         iter != job.session->source().types.end();
         ++iter) {
      syncable::PostTimeToTypeHistogram(iter->first,
                                        now - last_sync_session_end_time_);
    }
  }
  last_sync_session_end_time_ = now;
  UpdateCarryoverSessionState(job);
  if (IsSyncingCurrentlySilenced()) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " We are currently throttled. So not scheduling the next sync.";
    SaveJob(job);
    return;  // Nothing to do.
  }

  VLOG(1) << "SyncerThread(" << this << ")"
          << " Updating the next polling time after SyncMain";
  ScheduleNextSync(job);
}

void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  DCHECK(!old_job.session->HasMoreToSync());
  // Note: |num_server_changes_remaining| > 0 here implies that we received a
  // broken response while trying to download all updates, because the Syncer
  // will loop until this value is exhausted. Also, if unsynced_handles exist
  // but HasMoreToSync is false, this implies that the Syncer determined no
  // forward progress was possible at this time (an error, such as an HTTP
  // 500, is likely to have occurred during commit).
  const bool work_to_do =
     old_job.session->status_controller()->num_server_changes_remaining() > 0
     || old_job.session->status_controller()->unsynced_handles().size() > 0;
  VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: "
          << work_to_do;

  AdjustPolling(&old_job);

  // TODO(tim): Old impl had special code if notifications disabled. Needed?
  if (!work_to_do) {
    // Success implies backoff relief.  Note that if this was a "one-off" job
    // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
    // work_to_do before it ran this wont have changed, as jobs like this don't
    // run a full sync cycle.  So we don't need special code here.
    wait_interval_.reset();
    VLOG(1) << "SyncerThread(" << this << ")"
            << " Job suceeded so not scheduling more jobs";
    return;
  }

  if (old_job.session->source().updates_source ==
      GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " Job failed with source continuation";
    // We don't seem to have made forward progress. Start or extend backoff.
    HandleConsecutiveContinuationError(old_job);
  } else if (IsBackingOff()) {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " A nudge during backoff failed";
    // We weren't continuing but we're in backoff; must have been a nudge.
    DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
    DCHECK(!wait_interval_->had_nudge);
    wait_interval_->had_nudge = true;
    wait_interval_->timer.Reset();
  } else {
    VLOG(1) << "SyncerThread(" << this << ")"
            << " Failed. Schedule a job with continuation as source";
    // We weren't continuing and we aren't in backoff.  Schedule a normal
    // continuation.
    if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
      ScheduleConfigImpl(old_job.session->routing_info(),
          old_job.session->workers(),
          GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
    } else  {
      // For all other purposes(nudge and poll) we schedule a retry nudge.
      ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
                        GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
                        old_job.session->source().types, false, FROM_HERE);
    }
  }
}

void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
  DCHECK(thread_.IsRunning());
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());

  TimeDelta poll  = (!session_context_->notifications_enabled()) ?
      syncer_short_poll_interval_seconds_ :
      syncer_long_poll_interval_seconds_;
  bool rate_changed = !poll_timer_.IsRunning() ||
                       poll != poll_timer_.GetCurrentDelay();

  if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
    poll_timer_.Reset();

  if (!rate_changed)
    return;

  // Adjust poll rate.
  poll_timer_.Stop();
  poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
}

void SyncerThread::HandleConsecutiveContinuationError(
    const SyncSessionJob& old_job) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  // This if conditions should be compiled out in retail builds.
  if (IsBackingOff()) {
    DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
  }
  SyncSession* old = old_job.session.get();
  SyncSession* s(new SyncSession(session_context_.get(), this,
      old->source(), old->routing_info(), old->workers()));
  TimeDelta length = delay_provider_->GetDelay(
      IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));

  VLOG(1) << "SyncerThread(" << this << ")"
          << " In handle continuation error. Old job purpose is "
          << old_job.purpose;
  VLOG(1) << "SyncerThread(" << this << ")"
    << " In Handle continuation error. The time delta(ms) is: "
          << length.InMilliseconds();

  // This will reset the had_nudge variable as well.
  wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
                                        length));
  if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
    SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
                        make_linked_ptr(s), false, FROM_HERE);
    wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
  } else {
    // We are not in configuration mode. So wait_interval's pending job
    // should be null.
    DCHECK(wait_interval_->pending_configure_job.get() == NULL);

    // TODO(lipalani) - handle clear user data.
    InitOrCoalescePendingJob(old_job);
  }
  wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
}

// static
TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
  if (last_delay.InSeconds() >= kMaxBackoffSeconds)
    return TimeDelta::FromSeconds(kMaxBackoffSeconds);

  // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
  int64 backoff_s =
      std::max(static_cast<int64>(1),
               last_delay.InSeconds() * kBackoffRandomizationFactor);

  // Flip a coin to randomize backoff interval by +/- 50%.
  int rand_sign = base::RandInt(0, 1) * 2 - 1;

  // Truncation is adequate for rounding here.
  backoff_s = backoff_s +
      (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));

  // Cap the backoff interval.
  backoff_s = std::max(static_cast<int64>(1),
                       std::min(backoff_s, kMaxBackoffSeconds));

  return TimeDelta::FromSeconds(backoff_s);
}

void SyncerThread::Stop() {
  VLOG(1) << "SyncerThread(" << this << ")" << " stop called";
  syncer_->RequestEarlyExit();  // Safe to call from any thread.
  session_context_->connection_manager()->RemoveListener(this);
  thread_.Stop();
}

void SyncerThread::DoCanaryJob() {
  VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job";
  DoPendingJobIfPossible(true);
}

void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
  SyncSessionJob* job_to_execute = NULL;
  if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
      && wait_interval_->pending_configure_job.get()) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job";
    job_to_execute = wait_interval_->pending_configure_job.get();
  } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job";
    // Pending jobs mostly have time from the past. Reset it so this job
    // will get executed.
    if (pending_nudge_->scheduled_start < TimeTicks::Now())
      pending_nudge_->scheduled_start = TimeTicks::Now();

    scoped_ptr<SyncSession> session(CreateSyncSession(
        pending_nudge_->session->source()));

    // Also the routing info might have been changed since we cached the
    // pending nudge. Update it by coalescing to the latest.
    pending_nudge_->session->Coalesce(*(session.get()));
    // The pending nudge would be cleared in the DoSyncSessionJob function.
    job_to_execute = pending_nudge_.get();
  }

  if (job_to_execute != NULL) {
    VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job";
    SyncSessionJob copy = *job_to_execute;
    copy.is_canary_job = is_canary_job;
    DoSyncSessionJob(copy);
  }
}

SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
  ModelSafeRoutingInfo routes;
  std::vector<ModelSafeWorker*> workers;
  session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
  session_context_->registrar()->GetWorkers(&workers);
  SyncSourceInfo info(source);

  SyncSession* session(new SyncSession(session_context_.get(), this, info,
      routes, workers));

  return session;
}

void SyncerThread::PollTimerCallback() {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  ModelSafeRoutingInfo r;
  ModelTypePayloadMap types_with_payloads =
      syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
  SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
  SyncSession* s = CreateSyncSession(info);
  ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
      FROM_HERE);
}

void SyncerThread::Unthrottle() {
  DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
  VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled..";
  DoCanaryJob();
  wait_interval_.reset();
}

void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  session_context_->NotifyListeners(SyncEngineEvent(cause));
}

bool SyncerThread::IsBackingOff() const {
  return wait_interval_.get() && wait_interval_->mode ==
      WaitInterval::EXPONENTIAL_BACKOFF;
}

void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
  wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
                                        silenced_until - TimeTicks::Now()));
  wait_interval_->timer.Start(wait_interval_->length, this,
      &SyncerThread::Unthrottle);
}

bool SyncerThread::IsSyncingCurrentlySilenced() {
  return wait_interval_.get() && wait_interval_->mode ==
      WaitInterval::THROTTLED;
}

void SyncerThread::OnReceivedShortPollIntervalUpdate(
    const base::TimeDelta& new_interval) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  syncer_short_poll_interval_seconds_ = new_interval;
}

void SyncerThread::OnReceivedLongPollIntervalUpdate(
    const base::TimeDelta& new_interval) {
  DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
  syncer_long_poll_interval_seconds_ = new_interval;
}

void SyncerThread::OnShouldStopSyncingPermanently() {
  VLOG(1) << "SyncerThread(" << this << ")"
          << " OnShouldStopSyncingPermanently";
  syncer_->RequestEarlyExit();  // Thread-safe.
  Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
}

void SyncerThread::OnServerConnectionEvent(
    const ServerConnectionEvent2& event) {
  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
      &SyncerThread::CheckServerConnectionManagerStatus,
      event.connection_code));
}

void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
  session_context_->set_notifications_enabled(notifications_enabled);
}

}  // browser_sync