/* * Copyright 2014 Google Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import MyGame.Example.Monster; import MyGame.Example.MonsterStorageGrpc; import MyGame.Example.Stat; import com.google.flatbuffers.FlatBufferBuilder; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import org.junit.Assert; import java.io.IOException; import java.lang.InterruptedException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.CountDownLatch; /** * Demonstrates basic client-server interaction using grpc-java over netty. */ public class JavaGrpcTest { static final String BIG_MONSTER_NAME = "Cyberdemon"; static final short nestedMonsterHp = 600; static final short nestedMonsterMana = 1024; static final int numStreamedMsgs = 10; static final int timeoutMs = 3000; static Server server; static ManagedChannel channel; static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub; static MonsterStorageGrpc.MonsterStorageStub asyncStub; static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase { @Override public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) { Assert.assertEquals(request.name(), BIG_MONSTER_NAME); Assert.assertEquals(request.hp(), nestedMonsterHp); Assert.assertEquals(request.mana(), nestedMonsterMana); System.out.println("Received store request from " + request.name()); // Create a response from the incoming request name. Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10); responseObserver.onNext(stat); responseObserver.onCompleted(); } @Override public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) { // Create 10 monsters for streaming response. for (int i=0; i<numStreamedMsgs; i++) { Monster monster = GameFactory.createMonsterFromStat(request, i); responseObserver.onNext(monster); } responseObserver.onCompleted(); } @Override public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) { return computeMinMax(responseObserver, false); } @Override public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) { return computeMinMax(responseObserver, true); } private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) { final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE); final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>(); final AtomicInteger maxHpCount = new AtomicInteger(); final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE); final AtomicReference<String> minHpMonsterName = new AtomicReference<String>(); final AtomicInteger minHpCount = new AtomicInteger(); return new StreamObserver<Monster>() { public void onNext(Monster monster) { if (monster.hp() > maxHp.get()) { // Found a monster of higher hit points. maxHp.set(monster.hp()); maxHpMonsterName.set(monster.name()); maxHpCount.set(1); } else if (monster.hp() == maxHp.get()) { // Count how many times we saw a monster of current max hit points. maxHpCount.getAndIncrement(); } if (monster.hp() < minHp.get()) { // Found a monster of a lower hit points. minHp.set(monster.hp()); minHpMonsterName.set(monster.name()); minHpCount.set(1); } else if (monster.hp() == minHp.get()) { // Count how many times we saw a monster of current min hit points. minHpCount.getAndIncrement(); } } public void onCompleted() { Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get()); // Send max hit points first. responseObserver.onNext(maxHpStat); if (includeMin) { // Send min hit points. Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get()); responseObserver.onNext(minHpStat); } responseObserver.onCompleted(); } public void onError(Throwable t) { // Not expected Assert.fail(); }; }; } } @org.junit.BeforeClass public static void startServer() throws IOException { server = ServerBuilder.forPort(0).addService(new MyService()).build().start(); int port = server.getPort(); channel = ManagedChannelBuilder.forAddress("localhost", port) // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid // needing certificates. .usePlaintext(true) .directExecutor() .build(); blockingStub = MonsterStorageGrpc.newBlockingStub(channel); asyncStub = MonsterStorageGrpc.newStub(channel); } @org.junit.Test public void testUnary() throws IOException { Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana); Stat stat = blockingStub.store(monsterRequest); Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME); System.out.println("Received stat response from service: " + stat.id()); } @org.junit.Test public void testServerStreaming() throws IOException { Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana); Stat stat = blockingStub.store(monsterRequest); Iterator<Monster> iterator = blockingStub.retrieve(stat); int counter = 0; while(iterator.hasNext()) { Monster m = iterator.next(); System.out.println("Received monster " + m.name()); counter ++; } Assert.assertEquals(counter, numStreamedMsgs); System.out.println("FlatBuffers GRPC client/server test: completed successfully"); } @org.junit.Test public void testClientStreaming() throws IOException, InterruptedException { final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>(); final CountDownLatch streamAlive = new CountDownLatch(1); StreamObserver<Stat> statObserver = new StreamObserver<Stat>() { public void onCompleted() { streamAlive.countDown(); } public void onError(Throwable ex) { } public void onNext(Stat stat) { maxHitStat.set(stat); } }; StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver); short count = 10; for (short i = 0;i < count; ++i) { Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana); monsterStream.onNext(monster); } monsterStream.onCompleted(); // Wait a little bit for the server to send the stats of the monster with the max hit-points. streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS); Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1)); Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1)); Assert.assertEquals(maxHitStat.get().count(), 1); } @org.junit.Test public void testBiDiStreaming() throws IOException, InterruptedException { final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>(); final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>(); final CountDownLatch streamAlive = new CountDownLatch(1); StreamObserver<Stat> statObserver = new StreamObserver<Stat>() { public void onCompleted() { streamAlive.countDown(); } public void onError(Throwable ex) { } public void onNext(Stat stat) { // We expect the server to send the max stat first and then the min stat. if (maxHitStat.get() == null) { maxHitStat.set(stat); } else { minHitStat.set(stat); } } }; StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver); short count = 10; for (short i = 0;i < count; ++i) { Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana); monsterStream.onNext(monster); } monsterStream.onCompleted(); // Wait a little bit for the server to send the stats of the monster with the max and min hit-points. streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS); Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1)); Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1)); Assert.assertEquals(maxHitStat.get().count(), 1); Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0); Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0); Assert.assertEquals(minHitStat.get().count(), 1); } }