// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/sync/engine/syncer.h"
#include "base/message_loop.h"
#include "base/time.h"
#include "chrome/browser/sync/engine/apply_updates_command.h"
#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
#include "chrome/browser/sync/engine/build_commit_command.h"
#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
#include "chrome/browser/sync/engine/clear_data_command.h"
#include "chrome/browser/sync/engine/conflict_resolver.h"
#include "chrome/browser/sync/engine/download_updates_command.h"
#include "chrome/browser/sync/engine/get_commit_ids_command.h"
#include "chrome/browser/sync/engine/net/server_connection_manager.h"
#include "chrome/browser/sync/engine/post_commit_message_command.h"
#include "chrome/browser/sync/engine/process_commit_response_command.h"
#include "chrome/browser/sync/engine/process_updates_command.h"
#include "chrome/browser/sync/engine/resolve_conflicts_command.h"
#include "chrome/browser/sync/engine/store_timestamps_command.h"
#include "chrome/browser/sync/engine/syncer_end_command.h"
#include "chrome/browser/sync/engine/syncer_types.h"
#include "chrome/browser/sync/engine/syncer_util.h"
#include "chrome/browser/sync/engine/syncproto.h"
#include "chrome/browser/sync/engine/verify_updates_command.h"
#include "chrome/browser/sync/syncable/directory_manager.h"
#include "chrome/browser/sync/syncable/syncable-inl.h"
#include "chrome/browser/sync/syncable/syncable.h"
using base::TimeDelta;
using sync_pb::ClientCommand;
using syncable::Blob;
using syncable::IS_UNAPPLIED_UPDATE;
using syncable::SERVER_CTIME;
using syncable::SERVER_IS_DEL;
using syncable::SERVER_IS_DIR;
using syncable::SERVER_MTIME;
using syncable::SERVER_NON_UNIQUE_NAME;
using syncable::SERVER_PARENT_ID;
using syncable::SERVER_POSITION_IN_PARENT;
using syncable::SERVER_SPECIFICS;
using syncable::SERVER_VERSION;
using syncable::SYNCER;
using syncable::ScopedDirLookup;
using syncable::WriteTransaction;
namespace browser_sync {
using sessions::ScopedSessionContextConflictResolver;
using sessions::StatusController;
using sessions::SyncSession;
using sessions::ConflictProgress;
Syncer::Syncer()
: early_exit_requested_(false),
pre_conflict_resolution_closure_(NULL) {
}
Syncer::~Syncer() {}
bool Syncer::ExitRequested() {
base::AutoLock lock(early_exit_requested_lock_);
return early_exit_requested_;
}
void Syncer::RequestEarlyExit() {
base::AutoLock lock(early_exit_requested_lock_);
early_exit_requested_ = true;
}
// TODO(tim): Deprecated.
void Syncer::SyncShare(sessions::SyncSession* session) {
ScopedDirLookup dir(session->context()->directory_manager(),
session->context()->account_name());
// The directory must be good here.
CHECK(dir.good());
const sessions::SyncSourceInfo& source(session->source());
if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA ==
source.updates_source) {
SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
return;
} else {
SyncShare(session, SYNCER_BEGIN, SYNCER_END);
}
}
void Syncer::SyncShare(sessions::SyncSession* session,
const SyncerStep first_step,
const SyncerStep last_step) {
ScopedDirLookup dir(session->context()->directory_manager(),
session->context()->account_name());
// The directory must be good here.
CHECK(dir.good());
ScopedSessionContextConflictResolver scoped(session->context(),
&resolver_);
SyncerStep current_step = first_step;
SyncerStep next_step = current_step;
while (!ExitRequested()) {
switch (current_step) {
case SYNCER_BEGIN:
VLOG(1) << "Syncer Begin";
// This isn't perfect, as we can end up bundling extensions activity
// intended for the next session into the current one. We could do a
// test-and-reset as with the source, but note that also falls short if
// the commit request fails (e.g. due to lost connection), as we will
// fall all the way back to the syncer thread main loop in that case,
// creating a new session when a connection is established, losing the
// records set here on the original attempt. This should provide us
// with the right data "most of the time", and we're only using this
// for analysis purposes, so Law of Large Numbers FTW.
session->context()->extensions_monitor()->GetAndClearRecords(
session->mutable_extensions_activity());
next_step = CLEANUP_DISABLED_TYPES;
break;
case CLEANUP_DISABLED_TYPES: {
VLOG(1) << "Cleaning up disabled types";
CleanupDisabledTypesCommand cleanup;
cleanup.Execute(session);
next_step = DOWNLOAD_UPDATES;
break;
}
case DOWNLOAD_UPDATES: {
VLOG(1) << "Downloading Updates";
DownloadUpdatesCommand download_updates;
download_updates.Execute(session);
next_step = PROCESS_CLIENT_COMMAND;
break;
}
case PROCESS_CLIENT_COMMAND: {
VLOG(1) << "Processing Client Command";
ProcessClientCommand(session);
next_step = VERIFY_UPDATES;
break;
}
case VERIFY_UPDATES: {
VLOG(1) << "Verifying Updates";
VerifyUpdatesCommand verify_updates;
verify_updates.Execute(session);
next_step = PROCESS_UPDATES;
break;
}
case PROCESS_UPDATES: {
VLOG(1) << "Processing Updates";
ProcessUpdatesCommand process_updates;
process_updates.Execute(session);
next_step = STORE_TIMESTAMPS;
break;
}
case STORE_TIMESTAMPS: {
VLOG(1) << "Storing timestamps";
StoreTimestampsCommand store_timestamps;
store_timestamps.Execute(session);
// We should download all of the updates before attempting to process
// them.
if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
!session->status_controller()->download_updates_succeeded()) {
next_step = APPLY_UPDATES;
} else {
next_step = DOWNLOAD_UPDATES;
}
break;
}
case APPLY_UPDATES: {
VLOG(1) << "Applying Updates";
ApplyUpdatesCommand apply_updates;
apply_updates.Execute(session);
next_step = BUILD_COMMIT_REQUEST;
break;
}
// These two steps are combined since they are executed within the same
// write transaction.
case BUILD_COMMIT_REQUEST: {
session->status_controller()->set_syncing(true);
VLOG(1) << "Processing Commit Request";
ScopedDirLookup dir(session->context()->directory_manager(),
session->context()->account_name());
if (!dir.good()) {
LOG(ERROR) << "Scoped dir lookup failed!";
return;
}
WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
VLOG(1) << "Getting the Commit IDs";
GetCommitIdsCommand get_commit_ids_command(
session->context()->max_commit_batch_size());
get_commit_ids_command.Execute(session);
if (!session->status_controller()->commit_ids().empty()) {
VLOG(1) << "Building a commit message";
BuildCommitCommand build_commit_command;
build_commit_command.Execute(session);
next_step = POST_COMMIT_MESSAGE;
} else {
next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
}
break;
}
case POST_COMMIT_MESSAGE: {
VLOG(1) << "Posting a commit request";
PostCommitMessageCommand post_commit_command;
post_commit_command.Execute(session);
next_step = PROCESS_COMMIT_RESPONSE;
break;
}
case PROCESS_COMMIT_RESPONSE: {
VLOG(1) << "Processing the commit response";
session->status_controller()->reset_num_conflicting_commits();
ProcessCommitResponseCommand process_response_command;
process_response_command.Execute(session);
next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
break;
}
case BUILD_AND_PROCESS_CONFLICT_SETS: {
VLOG(1) << "Building and Processing Conflict Sets";
BuildAndProcessConflictSetsCommand build_process_conflict_sets;
build_process_conflict_sets.Execute(session);
if (session->status_controller()->conflict_sets_built())
next_step = SYNCER_END;
else
next_step = RESOLVE_CONFLICTS;
break;
}
case RESOLVE_CONFLICTS: {
VLOG(1) << "Resolving Conflicts";
// Trigger the pre_conflict_resolution_closure_, which is a testing
// hook for the unit tests, if it is non-NULL.
if (pre_conflict_resolution_closure_) {
pre_conflict_resolution_closure_->Run();
}
StatusController* status = session->status_controller();
status->reset_conflicts_resolved();
ResolveConflictsCommand resolve_conflicts_command;
resolve_conflicts_command.Execute(session);
if (status->HasConflictingUpdates())
next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
else
next_step = SYNCER_END;
break;
}
case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
StatusController* status = session->status_controller();
VLOG(1) << "Applying updates to resolve conflicts";
ApplyUpdatesCommand apply_updates;
int before_conflicting_updates = status->TotalNumConflictingItems();
apply_updates.Execute(session);
int after_conflicting_updates = status->TotalNumConflictingItems();
status->update_conflicts_resolved(before_conflicting_updates >
after_conflicting_updates);
if (status->conflicts_resolved())
next_step = RESOLVE_CONFLICTS;
else
next_step = SYNCER_END;
break;
}
case CLEAR_PRIVATE_DATA: {
VLOG(1) << "Clear Private Data";
ClearDataCommand clear_data_command;
clear_data_command.Execute(session);
next_step = SYNCER_END;
break;
}
case SYNCER_END: {
break;
}
default:
LOG(ERROR) << "Unknown command: " << current_step;
}
if (last_step == current_step)
break;
current_step = next_step;
}
VLOG(1) << "Syncer End";
SyncerEndCommand syncer_end_command;
syncer_end_command.Execute(session);
return;
}
void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
const ClientToServerResponse& response =
session->status_controller()->updates_response();
if (!response.has_client_command())
return;
const ClientCommand& command = response.client_command();
// The server limits the number of items a client can commit in one batch.
if (command.has_max_commit_batch_size()) {
session->context()->set_max_commit_batch_size(
command.max_commit_batch_size());
}
if (command.has_set_sync_long_poll_interval()) {
session->delegate()->OnReceivedLongPollIntervalUpdate(
TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
}
if (command.has_set_sync_poll_interval()) {
session->delegate()->OnReceivedShortPollIntervalUpdate(
TimeDelta::FromSeconds(command.set_sync_poll_interval()));
}
}
void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
}
void ClearServerData(syncable::MutableEntry* entry) {
entry->Put(SERVER_NON_UNIQUE_NAME, "");
entry->Put(SERVER_PARENT_ID, syncable::kNullId);
entry->Put(SERVER_MTIME, 0);
entry->Put(SERVER_CTIME, 0);
entry->Put(SERVER_VERSION, 0);
entry->Put(SERVER_IS_DIR, false);
entry->Put(SERVER_IS_DEL, false);
entry->Put(IS_UNAPPLIED_UPDATE, false);
entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
entry->Put(SERVER_POSITION_IN_PARENT, 0);
}
} // namespace browser_sync