/* * Copyright 2014 Google Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <thread> #include <grpc++/grpc++.h> #include "monster_test.grpc.fb.h" #include "monster_test_generated.h" #include "test_assert.h" using namespace MyGame::Example; using flatbuffers::grpc::MessageBuilder; using flatbuffers::FlatBufferBuilder; void message_builder_tests(); // The callback implementation of our server, that derives from the generated // code. It implements all rpcs specified in the FlatBuffers schema. class ServiceImpl final : public MyGame::Example::MonsterStorage::Service { virtual ::grpc::Status Store( ::grpc::ServerContext *context, const flatbuffers::grpc::Message<Monster> *request, flatbuffers::grpc::Message<Stat> *response) override { // Create a response from the incoming request name. fbb_.Clear(); auto stat_offset = CreateStat( fbb_, fbb_.CreateString("Hello, " + request->GetRoot()->name()->str())); fbb_.Finish(stat_offset); // Transfer ownership of the message to gRPC *response = fbb_.ReleaseMessage<Stat>(); return grpc::Status::OK; } virtual ::grpc::Status Retrieve( ::grpc::ServerContext *context, const flatbuffers::grpc::Message<Stat> *request, ::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> *writer) override { for (int i = 0; i < 5; i++) { fbb_.Clear(); // Create 5 monsters for resposne. auto monster_offset = CreateMonster(fbb_, 0, 0, 0, fbb_.CreateString(request->GetRoot()->id()->str() + " No." + std::to_string(i))); fbb_.Finish(monster_offset); flatbuffers::grpc::Message<Monster> monster = fbb_.ReleaseMessage<Monster>(); // Send monster to client using streaming. writer->Write(monster); } return grpc::Status::OK; } private: flatbuffers::grpc::MessageBuilder fbb_; }; // Track the server instance, so we can terminate it later. grpc::Server *server_instance = nullptr; // Mutex to protec this variable. std::mutex wait_for_server; std::condition_variable server_instance_cv; // This function implements the server thread. void RunServer() { auto server_address = "0.0.0.0:50051"; // Callback interface we implemented above. ServiceImpl service; grpc::ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); // Start the server. Lock to change the variable we're changing. wait_for_server.lock(); server_instance = builder.BuildAndStart().release(); wait_for_server.unlock(); server_instance_cv.notify_one(); std::cout << "Server listening on " << server_address << std::endl; // This will block the thread and serve requests. server_instance->Wait(); } template <class Builder> void StoreRPC(MonsterStorage::Stub *stub) { Builder fbb; grpc::ClientContext context; // Build a request with the name set. auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred")); MessageBuilder mb(std::move(fbb)); mb.Finish(monster_offset); auto request = mb.ReleaseMessage<Monster>(); flatbuffers::grpc::Message<Stat> response; // The actual RPC. auto status = stub->Store(&context, request, &response); if (status.ok()) { auto resp = response.GetRoot()->id(); std::cout << "RPC response: " << resp->str() << std::endl; } else { std::cout << "RPC failed" << std::endl; } } template <class Builder> void RetrieveRPC(MonsterStorage::Stub *stub) { Builder fbb; grpc::ClientContext context; fbb.Clear(); auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred")); fbb.Finish(stat_offset); auto request = MessageBuilder(std::move(fbb)).ReleaseMessage<Stat>(); flatbuffers::grpc::Message<Monster> response; auto stream = stub->Retrieve(&context, request); while (stream->Read(&response)) { auto resp = response.GetRoot()->name(); std::cout << "RPC Streaming response: " << resp->str() << std::endl; } } int grpc_server_test() { // Launch server. std::thread server_thread(RunServer); // wait for server to spin up. std::unique_lock<std::mutex> lock(wait_for_server); while (!server_instance) server_instance_cv.wait(lock); // Now connect the client. auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); auto stub = MyGame::Example::MonsterStorage::NewStub(channel); StoreRPC<MessageBuilder>(stub.get()); StoreRPC<FlatBufferBuilder>(stub.get()); RetrieveRPC<MessageBuilder>(stub.get()); RetrieveRPC<FlatBufferBuilder>(stub.get()); #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION { // Test that an invalid request errors out correctly grpc::ClientContext context; flatbuffers::grpc::Message<Monster> request; // simulate invalid message flatbuffers::grpc::Message<Stat> response; auto status = stub->Store(&context, request, &response); // The rpc status should be INTERNAL to indicate a verification error. This // matches the protobuf gRPC status code for an unparseable message. assert(!status.ok()); assert(status.error_code() == ::grpc::StatusCode::INTERNAL); assert(strcmp(status.error_message().c_str(), "Message verification failed") == 0); } #endif server_instance->Shutdown(); server_thread.join(); delete server_instance; return 0; } int main(int /*argc*/, const char * /*argv*/ []) { message_builder_tests(); grpc_server_test(); if (!testing_fails) { TEST_OUTPUT_LINE("ALL TESTS PASSED"); return 0; } else { TEST_OUTPUT_LINE("%d FAILED TESTS", testing_fails); return 1; } }