001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.Matchers.anyObject; 030import static org.mockito.Mockito.spy; 031import static org.mockito.Mockito.verify; 032import static org.mockito.internal.verification.VerificationModeFactory.times; 033 034import java.io.IOException; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.List; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellScanner; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 048import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 055import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.io.compress.GzipCodec; 059import org.apache.hadoop.util.StringUtils; 060import org.junit.Test; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 064import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 065 066/** 067 * Some basic ipc tests. 068 */ 069public abstract class AbstractTestIPC { 070 071 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class); 072 073 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 074 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 075 076 protected static final Configuration CONF = HBaseConfiguration.create(); 077 static { 078 // Set the default to be the old SimpleRpcServer. Subclasses test it and netty. 079 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); 080 } 081 082 protected abstract RpcServer createRpcServer(final Server server, final String name, 083 final List<BlockingServiceAndInterface> services, 084 final InetSocketAddress bindAddress, Configuration conf, 085 RpcScheduler scheduler) throws IOException; 086 087 protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); 088 089 /** 090 * Ensure we do not HAVE TO HAVE a codec. 091 */ 092 @Test 093 public void testNoCodec() throws IOException, ServiceException { 094 Configuration conf = HBaseConfiguration.create(); 095 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 096 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 097 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 098 new FifoRpcScheduler(CONF, 1)); 099 try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { 100 rpcServer.start(); 101 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 102 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 103 String message = "hello"; 104 assertEquals(message, 105 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 106 assertNull(pcrc.cellScanner()); 107 } finally { 108 rpcServer.stop(); 109 } 110 } 111 112 protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); 113 114 /** 115 * It is hard to verify the compression is actually happening under the wraps. Hope that if 116 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 117 * confirm that compression is happening down in the client and server). 118 */ 119 @Test 120 public void testCompressCellBlock() throws IOException, ServiceException { 121 Configuration conf = new Configuration(HBaseConfiguration.create()); 122 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 123 List<Cell> cells = new ArrayList<>(); 124 int count = 3; 125 for (int i = 0; i < count; i++) { 126 cells.add(CELL); 127 } 128 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 129 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 130 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 131 new FifoRpcScheduler(CONF, 1)); 132 133 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 134 rpcServer.start(); 135 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 136 HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); 137 String message = "hello"; 138 assertEquals(message, 139 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 140 int index = 0; 141 CellScanner cellScanner = pcrc.cellScanner(); 142 assertNotNull(cellScanner); 143 while (cellScanner.advance()) { 144 assertEquals(CELL, cellScanner.current()); 145 index++; 146 } 147 assertEquals(count, index); 148 } finally { 149 rpcServer.stop(); 150 } 151 } 152 153 protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup( 154 Configuration conf) throws IOException; 155 156 @Test 157 public void testRTEDuringConnectionSetup() throws Exception { 158 Configuration conf = HBaseConfiguration.create(); 159 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 160 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 161 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 162 new FifoRpcScheduler(CONF, 1)); 163 try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { 164 rpcServer.start(); 165 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 166 stub.ping(null, EmptyRequestProto.getDefaultInstance()); 167 fail("Expected an exception to have been thrown!"); 168 } catch (Exception e) { 169 LOG.info("Caught expected exception: " + e.toString()); 170 assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); 171 } finally { 172 rpcServer.stop(); 173 } 174 } 175 176 /** 177 * Tests that the rpc scheduler is called when requests arrive. 178 */ 179 @Test 180 public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { 181 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 182 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 183 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 184 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler); 185 verify(scheduler).init((RpcScheduler.Context) anyObject()); 186 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 187 rpcServer.start(); 188 verify(scheduler).start(); 189 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 190 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 191 for (int i = 0; i < 10; i++) { 192 stub.echo(null, param); 193 } 194 verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); 195 } finally { 196 rpcServer.stop(); 197 verify(scheduler).stop(); 198 } 199 } 200 201 /** Tests that the rpc scheduler is called when requests arrive. */ 202 @Test 203 public void testRpcMaxRequestSize() throws IOException, ServiceException { 204 Configuration conf = new Configuration(CONF); 205 conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); 206 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 207 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 208 SERVICE, null)), new InetSocketAddress("localhost", 0), conf, 209 new FifoRpcScheduler(conf, 1)); 210 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 211 rpcServer.start(); 212 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 213 StringBuilder message = new StringBuilder(1200); 214 for (int i = 0; i < 200; i++) { 215 message.append("hello."); 216 } 217 // set total RPC size bigger than 100 bytes 218 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); 219 stub.echo( 220 new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), 221 param); 222 fail("RPC should have failed because it exceeds max request size"); 223 } catch (ServiceException e) { 224 LOG.info("Caught expected exception: " + e); 225 assertTrue(e.toString(), 226 StringUtils.stringifyException(e).contains("RequestTooBigException")); 227 } finally { 228 rpcServer.stop(); 229 } 230 } 231 232 /** 233 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 234 * remoteAddress set to its Call Object 235 * @throws ServiceException 236 */ 237 @Test 238 public void testRpcServerForNotNullRemoteAddressInCallObject() 239 throws IOException, ServiceException { 240 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 241 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 242 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 243 new FifoRpcScheduler(CONF, 1)); 244 InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 245 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 246 rpcServer.start(); 247 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 248 assertEquals(localAddr.getAddress().getHostAddress(), 249 stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); 250 } finally { 251 rpcServer.stop(); 252 } 253 } 254 255 @Test 256 public void testRemoteError() throws IOException, ServiceException { 257 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 258 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 259 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 260 new FifoRpcScheduler(CONF, 1)); 261 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 262 rpcServer.start(); 263 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 264 stub.error(null, EmptyRequestProto.getDefaultInstance()); 265 } catch (ServiceException e) { 266 LOG.info("Caught expected exception: " + e); 267 IOException ioe = ProtobufUtil.handleRemoteException(e); 268 assertTrue(ioe instanceof DoNotRetryIOException); 269 assertTrue(ioe.getMessage().contains("server error!")); 270 } finally { 271 rpcServer.stop(); 272 } 273 } 274 275 @Test 276 public void testTimeout() throws IOException { 277 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 278 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 279 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 280 new FifoRpcScheduler(CONF, 1)); 281 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 282 rpcServer.start(); 283 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 284 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 285 int ms = 1000; 286 int timeout = 100; 287 for (int i = 0; i < 10; i++) { 288 pcrc.reset(); 289 pcrc.setCallTimeout(timeout); 290 long startTime = System.nanoTime(); 291 try { 292 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); 293 } catch (ServiceException e) { 294 long waitTime = (System.nanoTime() - startTime) / 1000000; 295 // expected 296 LOG.info("Caught expected exception: " + e); 297 IOException ioe = ProtobufUtil.handleRemoteException(e); 298 assertTrue(ioe.getCause() instanceof CallTimeoutException); 299 // confirm that we got exception before the actual pause. 300 assertTrue(waitTime < ms); 301 } 302 } 303 } finally { 304 rpcServer.stop(); 305 } 306 } 307 308 protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, 309 final List<BlockingServiceAndInterface> services, 310 final InetSocketAddress bindAddress, Configuration conf, 311 RpcScheduler scheduler) throws IOException; 312 313 /** Tests that the connection closing is handled by the client with outstanding RPC calls */ 314 @Test 315 public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { 316 Configuration conf = new Configuration(CONF); 317 RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", 318 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 319 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 320 new FifoRpcScheduler(CONF, 1)); 321 322 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 323 rpcServer.start(); 324 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 325 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 326 stub.echo(null, param); 327 fail("RPC should have failed because connection closed"); 328 } catch (ServiceException e) { 329 LOG.info("Caught expected exception: " + e.toString()); 330 } finally { 331 rpcServer.stop(); 332 } 333 } 334 335 @Test 336 public void testAsyncEcho() throws IOException { 337 Configuration conf = HBaseConfiguration.create(); 338 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 339 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 340 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 341 new FifoRpcScheduler(CONF, 1)); 342 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 343 rpcServer.start(); 344 Interface stub = newStub(client, rpcServer.getListenerAddress()); 345 int num = 10; 346 List<HBaseRpcController> pcrcList = new ArrayList<>(); 347 List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); 348 for (int i = 0; i < num; i++) { 349 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 350 BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); 351 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); 352 pcrcList.add(pcrc); 353 callbackList.add(done); 354 } 355 for (int i = 0; i < num; i++) { 356 HBaseRpcController pcrc = pcrcList.get(i); 357 assertFalse(pcrc.failed()); 358 assertNull(pcrc.cellScanner()); 359 assertEquals("hello-" + i, callbackList.get(i).get().getMessage()); 360 } 361 } finally { 362 rpcServer.stop(); 363 } 364 } 365 366 @Test 367 public void testAsyncRemoteError() throws IOException { 368 AbstractRpcClient<?> client = createRpcClient(CONF); 369 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 370 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 371 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 372 new FifoRpcScheduler(CONF, 1)); 373 try { 374 rpcServer.start(); 375 Interface stub = newStub(client, rpcServer.getListenerAddress()); 376 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 377 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 378 stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); 379 assertNull(callback.get()); 380 assertTrue(pcrc.failed()); 381 LOG.info("Caught expected exception: " + pcrc.getFailed()); 382 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 383 assertTrue(ioe instanceof DoNotRetryIOException); 384 assertTrue(ioe.getMessage().contains("server error!")); 385 } finally { 386 client.close(); 387 rpcServer.stop(); 388 } 389 } 390 391 @Test 392 public void testAsyncTimeout() throws IOException { 393 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 394 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 395 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 396 new FifoRpcScheduler(CONF, 1)); 397 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 398 rpcServer.start(); 399 Interface stub = newStub(client, rpcServer.getListenerAddress()); 400 List<HBaseRpcController> pcrcList = new ArrayList<>(); 401 List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); 402 int ms = 1000; 403 int timeout = 100; 404 long startTime = System.nanoTime(); 405 for (int i = 0; i < 10; i++) { 406 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 407 pcrc.setCallTimeout(timeout); 408 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 409 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); 410 pcrcList.add(pcrc); 411 callbackList.add(callback); 412 } 413 for (BlockingRpcCallback<?> callback : callbackList) { 414 assertNull(callback.get()); 415 } 416 long waitTime = (System.nanoTime() - startTime) / 1000000; 417 for (HBaseRpcController pcrc : pcrcList) { 418 assertTrue(pcrc.failed()); 419 LOG.info("Caught expected exception: " + pcrc.getFailed()); 420 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 421 assertTrue(ioe.getCause() instanceof CallTimeoutException); 422 } 423 // confirm that we got exception before the actual pause. 424 assertTrue(waitTime < ms); 425 } finally { 426 rpcServer.stop(); 427 } 428 } 429}