001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.List; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.concurrent.locks.ReadWriteLock; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.codec.Codec; 041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.Threads; 044import org.junit.Ignore; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 055 056@Category(IntegrationTests.class) 057public class IntegrationTestRpcClient { 058 059 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class); 060 061 private final Configuration conf; 062 063 private int numIterations = 10; 064 065 public IntegrationTestRpcClient() { 066 conf = HBaseConfiguration.create(); 067 } 068 069 protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { 070 return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { 071 @Override 072 protected Codec getCodec() { 073 return null; 074 } 075 }; 076 } 077 078 static String BIG_PAYLOAD; 079 080 static { 081 StringBuilder builder = new StringBuilder(); 082 083 while (builder.length() < 1024 * 1024) { // 2 MB 084 builder.append("big.payload."); 085 } 086 087 BIG_PAYLOAD = builder.toString(); 088 } 089 090 class Cluster { 091 ReadWriteLock lock = new ReentrantReadWriteLock(); 092 HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>(); 093 List<RpcServer> serverList = new ArrayList<>(); 094 int maxServers; 095 int minServers; 096 097 Cluster(int minServers, int maxServers) { 098 this.minServers = minServers; 099 this.maxServers = maxServers; 100 } 101 102 RpcServer startServer() throws IOException { 103 lock.writeLock().lock(); 104 try { 105 if (rpcServers.size() >= maxServers) { 106 return null; 107 } 108 109 RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer", 110 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), 111 new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); 112 rpcServer.start(); 113 InetSocketAddress address = rpcServer.getListenerAddress(); 114 if (address == null) { 115 throw new IOException("Listener channel is closed"); 116 } 117 rpcServers.put(address, rpcServer); 118 serverList.add(rpcServer); 119 LOG.info("Started server: " + address); 120 return rpcServer; 121 } finally { 122 lock.writeLock().unlock(); 123 } 124 } 125 126 void stopRandomServer() throws Exception { 127 RpcServer rpcServer = null; 128 lock.writeLock().lock(); 129 try { 130 if (rpcServers.size() <= minServers) { 131 return; 132 } 133 int size = rpcServers.size(); 134 int rand = ThreadLocalRandom.current().nextInt(size); 135 rpcServer = serverList.remove(rand); 136 InetSocketAddress address = rpcServer.getListenerAddress(); 137 if (address == null) { 138 // Throw exception here. We can't remove this instance from the server map because 139 // we no longer have access to its map key 140 throw new IOException("Listener channel is closed"); 141 } 142 rpcServers.remove(address); 143 144 if (rpcServer != null) { 145 stopServer(rpcServer); 146 } 147 } finally { 148 lock.writeLock().unlock(); 149 } 150 } 151 152 void stopServer(RpcServer rpcServer) throws InterruptedException { 153 InetSocketAddress address = rpcServer.getListenerAddress(); 154 LOG.info("Stopping server: " + address); 155 rpcServer.stop(); 156 rpcServer.join(); 157 LOG.info("Stopped server: " + address); 158 } 159 160 void stopRunning() throws InterruptedException { 161 lock.writeLock().lock(); 162 try { 163 for (RpcServer rpcServer : serverList) { 164 stopServer(rpcServer); 165 } 166 167 } finally { 168 lock.writeLock().unlock(); 169 } 170 } 171 172 RpcServer getRandomServer() { 173 lock.readLock().lock(); 174 try { 175 int size = rpcServers.size(); 176 int rand = ThreadLocalRandom.current().nextInt(size); 177 return serverList.get(rand); 178 } finally { 179 lock.readLock().unlock(); 180 } 181 } 182 } 183 184 static class MiniChaosMonkey extends Thread { 185 AtomicBoolean running = new AtomicBoolean(true); 186 AtomicReference<Exception> exception = new AtomicReference<>(null); 187 Cluster cluster; 188 189 public MiniChaosMonkey(Cluster cluster) { 190 this.cluster = cluster; 191 } 192 193 @Override 194 public void run() { 195 while (running.get()) { 196 if (ThreadLocalRandom.current().nextBoolean()) { 197 // start a server 198 try { 199 cluster.startServer(); 200 } catch (Exception e) { 201 LOG.warn(e.toString(), e); 202 exception.compareAndSet(null, e); 203 } 204 } else { 205 // stop a server 206 try { 207 cluster.stopRandomServer(); 208 } catch (Exception e) { 209 LOG.warn(e.toString(), e); 210 exception.compareAndSet(null, e); 211 } 212 } 213 214 Threads.sleep(100); 215 } 216 } 217 218 void stopRunning() { 219 running.set(false); 220 } 221 222 void rethrowException() throws Exception { 223 if (exception.get() != null) { 224 throw exception.get(); 225 } 226 } 227 } 228 229 static class SimpleClient extends Thread { 230 AbstractRpcClient<?> rpcClient; 231 AtomicBoolean running = new AtomicBoolean(true); 232 AtomicBoolean sending = new AtomicBoolean(false); 233 AtomicReference<Throwable> exception = new AtomicReference<>(null); 234 Cluster cluster; 235 String id; 236 long numCalls = 0; 237 238 public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) { 239 this.cluster = cluster; 240 this.rpcClient = rpcClient; 241 this.id = id; 242 this.setName(id); 243 } 244 245 @Override 246 @SuppressWarnings("AssertionFailureIgnored") // intended 247 public void run() { 248 while (running.get()) { 249 boolean isBigPayload = ThreadLocalRandom.current().nextBoolean(); 250 String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; 251 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); 252 EchoResponseProto ret; 253 RpcServer server = cluster.getRandomServer(); 254 try { 255 sending.set(true); 256 BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress()); 257 ret = stub.echo(null, param); 258 } catch (Exception e) { 259 LOG.warn(e.toString(), e); 260 continue; // expected in case connection is closing or closed 261 } 262 263 try { 264 assertNotNull(ret); 265 assertEquals(message, ret.getMessage()); 266 } catch (Throwable t) { 267 exception.compareAndSet(null, t); 268 } 269 270 numCalls++; 271 } 272 } 273 274 void stopRunning() { 275 running.set(false); 276 } 277 278 boolean isSending() { 279 return sending.get(); 280 } 281 282 void rethrowException() throws Throwable { 283 if (exception.get() != null) { 284 throw exception.get(); 285 } 286 } 287 } 288 289 /* 290 * Test that not started connections are successfully removed from connection pool when rpc client 291 * is closing. 292 */ 293 @Test 294 public void testRpcWithWriteThread() throws IOException, InterruptedException { 295 LOG.info("Starting test"); 296 Cluster cluster = new Cluster(1, 1); 297 cluster.startServer(); 298 conf.setBoolean(SPECIFIC_WRITE_THREAD, true); 299 for (int i = 0; i < 1000; i++) { 300 AbstractRpcClient<?> rpcClient = createRpcClient(conf, true); 301 SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); 302 client.start(); 303 while (!client.isSending()) { 304 Thread.sleep(1); 305 } 306 client.stopRunning(); 307 rpcClient.close(); 308 } 309 } 310 311 @Test 312 public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { 313 for (int i = 0; i < numIterations; i++) { 314 TimeoutThread.runWithTimeout(new Callable<Void>() { 315 @Override 316 public Void call() throws Exception { 317 try { 318 testRpcWithChaosMonkey(true); 319 } catch (Throwable e) { 320 if (e instanceof Exception) { 321 throw (Exception) e; 322 } else { 323 throw new Exception(e); 324 } 325 } 326 return null; 327 } 328 }, 180000); 329 } 330 } 331 332 @Test 333 @Ignore // TODO: test fails with async client 334 public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable { 335 for (int i = 0; i < numIterations; i++) { 336 TimeoutThread.runWithTimeout(new Callable<Void>() { 337 @Override 338 public Void call() throws Exception { 339 try { 340 testRpcWithChaosMonkey(false); 341 } catch (Throwable e) { 342 if (e instanceof Exception) { 343 throw (Exception) e; 344 } else { 345 throw new Exception(e); 346 } 347 } 348 return null; 349 } 350 }, 90000); 351 } 352 } 353 354 static class TimeoutThread extends Thread { 355 long timeout; 356 357 public TimeoutThread(long timeout) { 358 this.timeout = timeout; 359 } 360 361 @Override 362 public void run() { 363 try { 364 Thread.sleep(timeout); 365 Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP"); 366 System.exit(1); // a timeout happened 367 } catch (InterruptedException e) { 368 // this is what we want 369 } 370 } 371 372 // runs in the same thread context but injects a timeout thread which will exit the JVM on 373 // timeout 374 static void runWithTimeout(Callable<?> callable, long timeout) throws Exception { 375 TimeoutThread thread = new TimeoutThread(timeout); 376 thread.start(); 377 callable.call(); 378 thread.interrupt(); 379 } 380 } 381 382 public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable { 383 LOG.info("Starting test"); 384 Cluster cluster = new Cluster(10, 100); 385 for (int i = 0; i < 10; i++) { 386 cluster.startServer(); 387 } 388 389 ArrayList<SimpleClient> clients = new ArrayList<>(30); 390 391 // all threads should share the same rpc client 392 AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient); 393 394 for (int i = 0; i < 30; i++) { 395 String clientId = "client_" + i + "_"; 396 LOG.info("Starting client: " + clientId); 397 SimpleClient client = new SimpleClient(cluster, rpcClient, clientId); 398 client.start(); 399 clients.add(client); 400 } 401 402 LOG.info("Starting MiniChaosMonkey"); 403 MiniChaosMonkey cm = new MiniChaosMonkey(cluster); 404 cm.start(); 405 406 Threads.sleep(30000); 407 408 LOG.info("Stopping MiniChaosMonkey"); 409 cm.stopRunning(); 410 cm.join(); 411 cm.rethrowException(); 412 413 LOG.info("Stopping clients"); 414 for (SimpleClient client : clients) { 415 LOG.info("Stopping client: " + client.id); 416 LOG.info(client.id + " numCalls:" + client.numCalls); 417 client.stopRunning(); 418 client.join(); 419 client.rethrowException(); 420 assertTrue(client.numCalls > 10); 421 } 422 423 LOG.info("Stopping RpcClient"); 424 rpcClient.close(); 425 426 LOG.info("Stopping Cluster"); 427 cluster.stopRunning(); 428 } 429}