001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.List; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.concurrent.locks.ReadWriteLock; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.codec.Codec; 041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.Threads; 044import org.junit.Ignore; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 055 056@Category(IntegrationTests.class) 057public class IntegrationTestRpcClient { 058 059 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class); 060 061 private final Configuration conf; 062 063 private int numIterations = 10; 064 065 public IntegrationTestRpcClient() { 066 conf = HBaseConfiguration.create(); 067 } 068 069 protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { 070 return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { 071 @Override 072 Codec getCodec() { 073 return null; 074 } 075 }; 076 } 077 078 static String BIG_PAYLOAD; 079 080 static { 081 StringBuilder builder = new StringBuilder(); 082 083 while (builder.length() < 1024 * 1024) { // 2 MB 084 builder.append("big.payload."); 085 } 086 087 BIG_PAYLOAD = builder.toString(); 088 } 089 090 class Cluster { 091 ReadWriteLock lock = new ReentrantReadWriteLock(); 092 HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>(); 093 List<RpcServer> serverList = new ArrayList<>(); 094 int maxServers; 095 int minServers; 096 097 Cluster(int minServers, int maxServers) { 098 this.minServers = minServers; 099 this.maxServers = maxServers; 100 } 101 102 RpcServer startServer() throws IOException { 103 lock.writeLock().lock(); 104 try { 105 if (rpcServers.size() >= maxServers) { 106 return null; 107 } 108 109 RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer", 110 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), 111 new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); 112 rpcServer.start(); 113 InetSocketAddress address = rpcServer.getListenerAddress(); 114 if (address == null) { 115 throw new IOException("Listener channel is closed"); 116 } 117 rpcServers.put(address, rpcServer); 118 serverList.add(rpcServer); 119 LOG.info("Started server: " + address); 120 return rpcServer; 121 } finally { 122 lock.writeLock().unlock(); 123 } 124 } 125 126 void stopRandomServer() throws Exception { 127 lock.writeLock().lock(); 128 RpcServer rpcServer = null; 129 try { 130 if (rpcServers.size() <= minServers) { 131 return; 132 } 133 int size = rpcServers.size(); 134 int rand = ThreadLocalRandom.current().nextInt(size); 135 rpcServer = serverList.remove(rand); 136 InetSocketAddress address = rpcServer.getListenerAddress(); 137 if (address == null) { 138 // Throw exception here. We can't remove this instance from the server map because 139 // we no longer have access to its map key 140 throw new IOException("Listener channel is closed"); 141 } 142 rpcServers.remove(address); 143 144 if (rpcServer != null) { 145 stopServer(rpcServer); 146 } 147 } finally { 148 lock.writeLock().unlock(); 149 } 150 } 151 152 void stopServer(RpcServer rpcServer) throws InterruptedException { 153 InetSocketAddress address = rpcServer.getListenerAddress(); 154 LOG.info("Stopping server: " + address); 155 rpcServer.stop(); 156 rpcServer.join(); 157 LOG.info("Stopped server: " + address); 158 } 159 160 void stopRunning() throws InterruptedException { 161 lock.writeLock().lock(); 162 try { 163 for (RpcServer rpcServer : serverList) { 164 stopServer(rpcServer); 165 } 166 167 } finally { 168 lock.writeLock().unlock(); 169 } 170 } 171 172 RpcServer getRandomServer() { 173 lock.readLock().lock(); 174 try { 175 int size = rpcServers.size(); 176 int rand = ThreadLocalRandom.current().nextInt(size); 177 return serverList.get(rand); 178 } finally { 179 lock.readLock().unlock(); 180 } 181 } 182 } 183 184 static class MiniChaosMonkey extends Thread { 185 AtomicBoolean running = new AtomicBoolean(true); 186 AtomicReference<Exception> exception = new AtomicReference<>(null); 187 Cluster cluster; 188 189 public MiniChaosMonkey(Cluster cluster) { 190 this.cluster = cluster; 191 } 192 193 @Override 194 public void run() { 195 while (running.get()) { 196 if (ThreadLocalRandom.current().nextBoolean()) { 197 // start a server 198 try { 199 cluster.startServer(); 200 } catch (Exception e) { 201 LOG.warn(e.toString(), e); 202 exception.compareAndSet(null, e); 203 } 204 } else { 205 // stop a server 206 try { 207 cluster.stopRandomServer(); 208 } catch (Exception e) { 209 LOG.warn(e.toString(), e); 210 exception.compareAndSet(null, e); 211 } 212 } 213 214 Threads.sleep(100); 215 } 216 } 217 218 void stopRunning() { 219 running.set(false); 220 } 221 222 void rethrowException() throws Exception { 223 if (exception.get() != null) { 224 throw exception.get(); 225 } 226 } 227 } 228 229 static class SimpleClient extends Thread { 230 AbstractRpcClient<?> rpcClient; 231 AtomicBoolean running = new AtomicBoolean(true); 232 AtomicBoolean sending = new AtomicBoolean(false); 233 AtomicReference<Throwable> exception = new AtomicReference<>(null); 234 Cluster cluster; 235 String id; 236 long numCalls = 0; 237 238 public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) { 239 this.cluster = cluster; 240 this.rpcClient = rpcClient; 241 this.id = id; 242 this.setName(id); 243 } 244 245 @Override 246 public void run() { 247 while (running.get()) { 248 boolean isBigPayload = ThreadLocalRandom.current().nextBoolean(); 249 String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; 250 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); 251 EchoResponseProto ret; 252 RpcServer server = cluster.getRandomServer(); 253 try { 254 sending.set(true); 255 BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress()); 256 ret = stub.echo(null, param); 257 } catch (Exception e) { 258 LOG.warn(e.toString(), e); 259 continue; // expected in case connection is closing or closed 260 } 261 262 try { 263 assertNotNull(ret); 264 assertEquals(message, ret.getMessage()); 265 } catch (Throwable t) { 266 exception.compareAndSet(null, t); 267 } 268 269 numCalls++; 270 } 271 } 272 273 void stopRunning() { 274 running.set(false); 275 } 276 277 boolean isSending() { 278 return sending.get(); 279 } 280 281 void rethrowException() throws Throwable { 282 if (exception.get() != null) { 283 throw exception.get(); 284 } 285 } 286 } 287 288 /* 289 * Test that not started connections are successfully removed from connection pool when rpc client 290 * is closing. 291 */ 292 @Test 293 public void testRpcWithWriteThread() throws IOException, InterruptedException { 294 LOG.info("Starting test"); 295 Cluster cluster = new Cluster(1, 1); 296 cluster.startServer(); 297 conf.setBoolean(SPECIFIC_WRITE_THREAD, true); 298 for (int i = 0; i < 1000; i++) { 299 AbstractRpcClient<?> rpcClient = createRpcClient(conf, true); 300 SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); 301 client.start(); 302 while (!client.isSending()) { 303 Thread.sleep(1); 304 } 305 client.stopRunning(); 306 rpcClient.close(); 307 } 308 } 309 310 @Test 311 public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { 312 for (int i = 0; i < numIterations; i++) { 313 TimeoutThread.runWithTimeout(new Callable<Void>() { 314 @Override 315 public Void call() throws Exception { 316 try { 317 testRpcWithChaosMonkey(true); 318 } catch (Throwable e) { 319 if (e instanceof Exception) { 320 throw (Exception) e; 321 } else { 322 throw new Exception(e); 323 } 324 } 325 return null; 326 } 327 }, 180000); 328 } 329 } 330 331 @Test 332 @Ignore // TODO: test fails with async client 333 public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable { 334 for (int i = 0; i < numIterations; i++) { 335 TimeoutThread.runWithTimeout(new Callable<Void>() { 336 @Override 337 public Void call() throws Exception { 338 try { 339 testRpcWithChaosMonkey(false); 340 } catch (Throwable e) { 341 if (e instanceof Exception) { 342 throw (Exception) e; 343 } else { 344 throw new Exception(e); 345 } 346 } 347 return null; 348 } 349 }, 90000); 350 } 351 } 352 353 static class TimeoutThread extends Thread { 354 long timeout; 355 356 public TimeoutThread(long timeout) { 357 this.timeout = timeout; 358 } 359 360 @Override 361 public void run() { 362 try { 363 Thread.sleep(timeout); 364 Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP"); 365 System.exit(1); // a timeout happened 366 } catch (InterruptedException e) { 367 // this is what we want 368 } 369 } 370 371 // runs in the same thread context but injects a timeout thread which will exit the JVM on 372 // timeout 373 static void runWithTimeout(Callable<?> callable, long timeout) throws Exception { 374 TimeoutThread thread = new TimeoutThread(timeout); 375 thread.start(); 376 callable.call(); 377 thread.interrupt(); 378 } 379 } 380 381 public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable { 382 LOG.info("Starting test"); 383 Cluster cluster = new Cluster(10, 100); 384 for (int i = 0; i < 10; i++) { 385 cluster.startServer(); 386 } 387 388 ArrayList<SimpleClient> clients = new ArrayList<>(30); 389 390 // all threads should share the same rpc client 391 AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient); 392 393 for (int i = 0; i < 30; i++) { 394 String clientId = "client_" + i + "_"; 395 LOG.info("Starting client: " + clientId); 396 SimpleClient client = new SimpleClient(cluster, rpcClient, clientId); 397 client.start(); 398 clients.add(client); 399 } 400 401 LOG.info("Starting MiniChaosMonkey"); 402 MiniChaosMonkey cm = new MiniChaosMonkey(cluster); 403 cm.start(); 404 405 Threads.sleep(30000); 406 407 LOG.info("Stopping MiniChaosMonkey"); 408 cm.stopRunning(); 409 cm.join(); 410 cm.rethrowException(); 411 412 LOG.info("Stopping clients"); 413 for (SimpleClient client : clients) { 414 LOG.info("Stopping client: " + client.id); 415 LOG.info(client.id + " numCalls:" + client.numCalls); 416 client.stopRunning(); 417 client.join(); 418 client.rethrowException(); 419 assertTrue(client.numCalls > 10); 420 } 421 422 LOG.info("Stopping RpcClient"); 423 rpcClient.close(); 424 425 LOG.info("Stopping Cluster"); 426 cluster.stopRunning(); 427 } 428}