/*
 * Copyright 2014 Google Inc. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <thread>

#include <grpc++/grpc++.h>

#include "monster_test_generated.h"
#include "monster_test.grpc.fb.h"

using namespace MyGame::Example;

// The callback implementation of our server, that derives from the generated
// code. It implements all rpcs specified in the FlatBuffers schema.
class ServiceImpl final : public MyGame::Example::MonsterStorage::Service {
  virtual ::grpc::Status Store(::grpc::ServerContext* context,
                               const flatbuffers::grpc::Message<Monster> *request,
                               flatbuffers::grpc::Message<Stat> *response)
                               override {
    // Create a response from the incoming request name.
    fbb_.Clear();
    auto stat_offset = CreateStat(fbb_, fbb_.CreateString("Hello, " +
                                        request->GetRoot()->name()->str()));
    fbb_.Finish(stat_offset);
    // Transfer ownership of the message to gRPC
    *response = fbb_.ReleaseMessage<Stat>();
    return grpc::Status::OK;
  }
  virtual ::grpc::Status Retrieve(::grpc::ServerContext *context,
                               const flatbuffers::grpc::Message<Stat> *request,
                               ::grpc::ServerWriter< flatbuffers::grpc::Message<Monster>>* writer)
                               override {

    for (int i=0; i<10; i++) {
       fbb_.Clear();
       // Create 10 monsters for resposne.
       auto monster_offset =
       CreateMonster(fbb_, 0, 0, 0, fbb_.CreateString(
         request->GetRoot()->id()->str() + " No." + std::to_string(i)));
       fbb_.Finish(monster_offset);

       flatbuffers::grpc::Message<Monster> monster = fbb_.ReleaseMessage<Monster>();

       // Send monster to client using streaming.
       writer->Write(monster);
     }
     return grpc::Status::OK;
  }

 private:
  flatbuffers::grpc::MessageBuilder fbb_;
};

// Track the server instance, so we can terminate it later.
grpc::Server *server_instance = nullptr;
// Mutex to protec this variable.
std::mutex wait_for_server;
std::condition_variable server_instance_cv;

// This function implements the server thread.
void RunServer() {
  auto server_address = "0.0.0.0:50051";
  // Callback interface we implemented above.
  ServiceImpl service;
  grpc::ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);

  // Start the server. Lock to change the variable we're changing.
  wait_for_server.lock();
  server_instance = builder.BuildAndStart().release();
  wait_for_server.unlock();
  server_instance_cv.notify_one();

  std::cout << "Server listening on " << server_address << std::endl;
  // This will block the thread and serve requests.
  server_instance->Wait();
}

int main(int /*argc*/, const char * /*argv*/[]) {
  // Launch server.
  std::thread server_thread(RunServer);

  // wait for server to spin up.
  std::unique_lock<std::mutex> lock(wait_for_server);
  while (!server_instance) server_instance_cv.wait(lock);

  // Now connect the client.
  auto channel = grpc::CreateChannel("localhost:50051",
                                     grpc::InsecureChannelCredentials());
  auto stub = MyGame::Example::MonsterStorage::NewStub(channel);


  flatbuffers::grpc::MessageBuilder fbb;
  {
    grpc::ClientContext context;
    // Build a request with the name set.
    auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
    fbb.Finish(monster_offset);
    auto request = fbb.ReleaseMessage<Monster>();
    flatbuffers::grpc::Message<Stat> response;

    // The actual RPC.
    auto status = stub->Store(&context, request, &response);

    if (status.ok()) {
      auto resp = response.GetRoot()->id();
      std::cout << "RPC response: " << resp->str() << std::endl;
    } else {
      std::cout << "RPC failed" << std::endl;
    }
  }
  {
    grpc::ClientContext context;
    fbb.Clear();
    auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
    fbb.Finish(stat_offset);
    auto request = fbb.ReleaseMessage<Stat>();

    flatbuffers::grpc::Message<Monster> response;
    auto stream = stub->Retrieve(&context, request);
    while (stream->Read(&response)) {
      auto resp = response.GetRoot()->name();
      std::cout << "RPC Streaming response: " << resp->str() << std::endl;
    }
  }

  #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
  {
    // Test that an invalid request errors out correctly
    grpc::ClientContext context;
    flatbuffers::grpc::Message<Monster> request;  // simulate invalid message
    flatbuffers::grpc::Message<Stat> response;
    auto status = stub->Store(&context, request, &response);
    // The rpc status should be INTERNAL to indicate a verification error. This
    // matches the protobuf gRPC status code for an unparseable message.
    assert(!status.ok());
    assert(status.error_code() == ::grpc::StatusCode::INTERNAL);
    assert(strcmp(status.error_message().c_str(), "Message verification failed") == 0);
  }
  #endif

  server_instance->Shutdown();

  server_thread.join();

  delete server_instance;

  return 0;
}