001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; 022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 023import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.net.InetSocketAddress; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Random; 034import java.util.concurrent.Callable; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReadWriteLock; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.codec.Codec; 043import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.util.Threads; 049import org.junit.Ignore; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055 056@Category(IntegrationTests.class) 057public class IntegrationTestRpcClient { 058 059 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class); 060 061 private final Configuration conf; 062 063 private int numIterations = 10; 064 065 public IntegrationTestRpcClient() { 066 conf = HBaseConfiguration.create(); 067 } 068 069 protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { 070 return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { 071 @Override 072 Codec getCodec() { 073 return null; 074 } 075 }; 076 } 077 078 static String BIG_PAYLOAD; 079 080 static { 081 StringBuilder builder = new StringBuilder(); 082 083 while (builder.length() < 1024 * 1024) { // 2 MB 084 builder.append("big.payload."); 085 } 086 087 BIG_PAYLOAD = builder.toString(); 088 } 089 090 class Cluster { 091 Random random = new Random(); 092 ReadWriteLock lock = new ReentrantReadWriteLock(); 093 HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>(); 094 List<RpcServer> serverList = new ArrayList<>(); 095 int maxServers; 096 int minServers; 097 098 Cluster(int minServers, int maxServers) { 099 this.minServers = minServers; 100 this.maxServers = maxServers; 101 } 102 103 RpcServer startServer() throws IOException { 104 lock.writeLock().lock(); 105 try { 106 if (rpcServers.size() >= maxServers) { 107 return null; 108 } 109 110 RpcServer rpcServer = RpcServerFactory.createRpcServer(null, 111 "testRpcServer", Lists 112 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), 113 new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler( 114 conf, 1)); 115 rpcServer.start(); 116 InetSocketAddress address = rpcServer.getListenerAddress(); 117 if (address == null) { 118 throw new IOException("Listener channel is closed"); 119 } 120 rpcServers.put(address, rpcServer); 121 serverList.add(rpcServer); 122 LOG.info("Started server: " + address); 123 return rpcServer; 124 } finally { 125 lock.writeLock().unlock(); 126 } 127 } 128 129 void stopRandomServer() throws Exception { 130 lock.writeLock().lock(); 131 RpcServer rpcServer = null; 132 try { 133 if (rpcServers.size() <= minServers) { 134 return; 135 } 136 int size = rpcServers.size(); 137 int rand = random.nextInt(size); 138 rpcServer = serverList.remove(rand); 139 InetSocketAddress address = rpcServer.getListenerAddress(); 140 if (address == null) { 141 // Throw exception here. We can't remove this instance from the server map because 142 // we no longer have access to its map key 143 throw new IOException("Listener channel is closed"); 144 } 145 rpcServers.remove(address); 146 147 if (rpcServer != null) { 148 stopServer(rpcServer); 149 } 150 } finally { 151 lock.writeLock().unlock(); 152 } 153 } 154 155 void stopServer(RpcServer rpcServer) throws InterruptedException { 156 InetSocketAddress address = rpcServer.getListenerAddress(); 157 LOG.info("Stopping server: " + address); 158 rpcServer.stop(); 159 rpcServer.join(); 160 LOG.info("Stopped server: " + address); 161 } 162 163 void stopRunning() throws InterruptedException { 164 lock.writeLock().lock(); 165 try { 166 for (RpcServer rpcServer : serverList) { 167 stopServer(rpcServer); 168 } 169 170 } finally { 171 lock.writeLock().unlock(); 172 } 173 } 174 175 RpcServer getRandomServer() { 176 lock.readLock().lock(); 177 try { 178 int size = rpcServers.size(); 179 int rand = random.nextInt(size); 180 return serverList.get(rand); 181 } finally { 182 lock.readLock().unlock(); 183 } 184 } 185 } 186 187 static class MiniChaosMonkey extends Thread { 188 AtomicBoolean running = new AtomicBoolean(true); 189 Random random = new Random(); 190 AtomicReference<Exception> exception = new AtomicReference<>(null); 191 Cluster cluster; 192 193 public MiniChaosMonkey(Cluster cluster) { 194 this.cluster = cluster; 195 } 196 197 @Override 198 public void run() { 199 while (running.get()) { 200 if (random.nextBoolean()) { 201 //start a server 202 try { 203 cluster.startServer(); 204 } catch (Exception e) { 205 LOG.warn(e.toString(), e); 206 exception.compareAndSet(null, e); 207 } 208 } else { 209 // stop a server 210 try { 211 cluster.stopRandomServer(); 212 } catch (Exception e) { 213 LOG.warn(e.toString(), e); 214 exception.compareAndSet(null, e); 215 } 216 } 217 218 Threads.sleep(100); 219 } 220 } 221 222 void stopRunning() { 223 running.set(false); 224 } 225 226 void rethrowException() throws Exception { 227 if (exception.get() != null) { 228 throw exception.get(); 229 } 230 } 231 } 232 233 static class SimpleClient extends Thread { 234 AbstractRpcClient<?> rpcClient; 235 AtomicBoolean running = new AtomicBoolean(true); 236 AtomicBoolean sending = new AtomicBoolean(false); 237 AtomicReference<Throwable> exception = new AtomicReference<>(null); 238 Cluster cluster; 239 String id; 240 long numCalls = 0; 241 Random random = new Random(); 242 243 public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) { 244 this.cluster = cluster; 245 this.rpcClient = rpcClient; 246 this.id = id; 247 this.setName(id); 248 } 249 250 @Override 251 public void run() { 252 while (running.get()) { 253 boolean isBigPayload = random.nextBoolean(); 254 String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; 255 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); 256 EchoResponseProto ret; 257 RpcServer server = cluster.getRandomServer(); 258 try { 259 sending.set(true); 260 BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress()); 261 ret = stub.echo(null, param); 262 } catch (Exception e) { 263 LOG.warn(e.toString(), e); 264 continue; // expected in case connection is closing or closed 265 } 266 267 try { 268 assertNotNull(ret); 269 assertEquals(message, ret.getMessage()); 270 } catch (Throwable t) { 271 exception.compareAndSet(null, t); 272 } 273 274 numCalls++; 275 } 276 } 277 278 void stopRunning() { 279 running.set(false); 280 } 281 boolean isSending() { 282 return sending.get(); 283 } 284 285 void rethrowException() throws Throwable { 286 if (exception.get() != null) { 287 throw exception.get(); 288 } 289 } 290 } 291 292 /* 293 Test that not started connections are successfully removed from connection pool when 294 rpc client is closing. 295 */ 296 @Test 297 public void testRpcWithWriteThread() throws IOException, InterruptedException { 298 LOG.info("Starting test"); 299 Cluster cluster = new Cluster(1, 1); 300 cluster.startServer(); 301 conf.setBoolean(SPECIFIC_WRITE_THREAD, true); 302 for(int i = 0; i <1000; i++) { 303 AbstractRpcClient<?> rpcClient = createRpcClient(conf, true); 304 SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); 305 client.start(); 306 while(!client.isSending()) { 307 Thread.sleep(1); 308 } 309 client.stopRunning(); 310 rpcClient.close(); 311 } 312 } 313 314 315 @Test 316 public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { 317 for (int i = 0; i < numIterations; i++) { 318 TimeoutThread.runWithTimeout(new Callable<Void>() { 319 @Override 320 public Void call() throws Exception { 321 try { 322 testRpcWithChaosMonkey(true); 323 } catch (Throwable e) { 324 if (e instanceof Exception) { 325 throw (Exception)e; 326 } else { 327 throw new Exception(e); 328 } 329 } 330 return null; 331 } 332 }, 180000); 333 } 334 } 335 336 @Test 337 @Ignore // TODO: test fails with async client 338 public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable { 339 for (int i = 0; i < numIterations; i++) { 340 TimeoutThread.runWithTimeout(new Callable<Void>() { 341 @Override 342 public Void call() throws Exception { 343 try { 344 testRpcWithChaosMonkey(false); 345 } catch (Throwable e) { 346 if (e instanceof Exception) { 347 throw (Exception)e; 348 } else { 349 throw new Exception(e); 350 } 351 } 352 return null; 353 } 354 }, 90000); 355 } 356 } 357 358 static class TimeoutThread extends Thread { 359 long timeout; 360 public TimeoutThread(long timeout) { 361 this.timeout = timeout; 362 } 363 364 @Override 365 public void run() { 366 try { 367 Thread.sleep(timeout); 368 Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP"); 369 System.exit(1); // a timeout happened 370 } catch (InterruptedException e) { 371 // this is what we want 372 } 373 } 374 375 // runs in the same thread context but injects a timeout thread which will exit the JVM on 376 // timeout 377 static void runWithTimeout(Callable<?> callable, long timeout) throws Exception { 378 TimeoutThread thread = new TimeoutThread(timeout); 379 thread.start(); 380 callable.call(); 381 thread.interrupt(); 382 } 383 } 384 385 public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable { 386 LOG.info("Starting test"); 387 Cluster cluster = new Cluster(10, 100); 388 for (int i = 0; i < 10; i++) { 389 cluster.startServer(); 390 } 391 392 ArrayList<SimpleClient> clients = new ArrayList<>(30); 393 394 // all threads should share the same rpc client 395 AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient); 396 397 for (int i = 0; i < 30; i++) { 398 String clientId = "client_" + i + "_"; 399 LOG.info("Starting client: " + clientId); 400 SimpleClient client = new SimpleClient(cluster, rpcClient, clientId); 401 client.start(); 402 clients.add(client); 403 } 404 405 LOG.info("Starting MiniChaosMonkey"); 406 MiniChaosMonkey cm = new MiniChaosMonkey(cluster); 407 cm.start(); 408 409 Threads.sleep(30000); 410 411 LOG.info("Stopping MiniChaosMonkey"); 412 cm.stopRunning(); 413 cm.join(); 414 cm.rethrowException(); 415 416 LOG.info("Stopping clients"); 417 for (SimpleClient client : clients) { 418 LOG.info("Stopping client: " + client.id); 419 LOG.info(client.id + " numCalls:" + client.numCalls); 420 client.stopRunning(); 421 client.join(); 422 client.rethrowException(); 423 assertTrue(client.numCalls > 10); 424 } 425 426 LOG.info("Stopping RpcClient"); 427 rpcClient.close(); 428 429 LOG.info("Stopping Cluster"); 430 cluster.stopRunning(); 431 } 432}