001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.anyObject; 030import static org.mockito.Mockito.spy; 031import static org.mockito.Mockito.verify; 032import static org.mockito.internal.verification.VerificationModeFactory.times; 033 034import java.io.IOException; 035import java.net.InetSocketAddress; 036import java.util.ArrayList; 037import java.util.List; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellScanner; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.io.compress.GzipCodec; 049import org.apache.hadoop.util.StringUtils; 050import org.junit.Test; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 059import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 060import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 061import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 062import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 063import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 064import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 066 067/** 068 * Some basic ipc tests. 069 */ 070public abstract class AbstractTestIPC { 071 072 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class); 073 074 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 075 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 076 077 protected static final Configuration CONF = HBaseConfiguration.create(); 078 static { 079 // Set the default to be the old SimpleRpcServer. Subclasses test it and netty. 080 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); 081 } 082 083 protected abstract RpcServer createRpcServer(final Server server, final String name, 084 final List<BlockingServiceAndInterface> services, 085 final InetSocketAddress bindAddress, Configuration conf, 086 RpcScheduler scheduler) throws IOException; 087 088 protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); 089 090 /** 091 * Ensure we do not HAVE TO HAVE a codec. 092 */ 093 @Test 094 public void testNoCodec() throws IOException, ServiceException { 095 Configuration conf = HBaseConfiguration.create(); 096 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 097 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 098 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 099 new FifoRpcScheduler(CONF, 1)); 100 try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { 101 rpcServer.start(); 102 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 103 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 104 String message = "hello"; 105 assertEquals(message, 106 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 107 assertNull(pcrc.cellScanner()); 108 } finally { 109 rpcServer.stop(); 110 } 111 } 112 113 protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); 114 115 /** 116 * It is hard to verify the compression is actually happening under the wraps. Hope that if 117 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 118 * confirm that compression is happening down in the client and server). 119 */ 120 @Test 121 public void testCompressCellBlock() throws IOException, ServiceException { 122 Configuration conf = new Configuration(HBaseConfiguration.create()); 123 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 124 List<Cell> cells = new ArrayList<>(); 125 int count = 3; 126 for (int i = 0; i < count; i++) { 127 cells.add(CELL); 128 } 129 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 130 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 131 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 132 new FifoRpcScheduler(CONF, 1)); 133 134 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 135 rpcServer.start(); 136 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 137 HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); 138 String message = "hello"; 139 assertEquals(message, 140 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 141 int index = 0; 142 CellScanner cellScanner = pcrc.cellScanner(); 143 assertNotNull(cellScanner); 144 while (cellScanner.advance()) { 145 assertEquals(CELL, cellScanner.current()); 146 index++; 147 } 148 assertEquals(count, index); 149 } finally { 150 rpcServer.stop(); 151 } 152 } 153 154 protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup( 155 Configuration conf) throws IOException; 156 157 @Test 158 public void testRTEDuringConnectionSetup() throws Exception { 159 Configuration conf = HBaseConfiguration.create(); 160 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 161 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 162 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 163 new FifoRpcScheduler(CONF, 1)); 164 try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { 165 rpcServer.start(); 166 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 167 stub.ping(null, EmptyRequestProto.getDefaultInstance()); 168 fail("Expected an exception to have been thrown!"); 169 } catch (Exception e) { 170 LOG.info("Caught expected exception: " + e.toString()); 171 assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); 172 } finally { 173 rpcServer.stop(); 174 } 175 } 176 177 /** 178 * Tests that the rpc scheduler is called when requests arrive. 179 */ 180 @Test 181 public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { 182 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 183 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 184 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 185 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler); 186 verify(scheduler).init((RpcScheduler.Context) anyObject()); 187 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 188 rpcServer.start(); 189 verify(scheduler).start(); 190 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 191 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 192 for (int i = 0; i < 10; i++) { 193 stub.echo(null, param); 194 } 195 verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); 196 } finally { 197 rpcServer.stop(); 198 verify(scheduler).stop(); 199 } 200 } 201 202 /** Tests that the rpc scheduler is called when requests arrive. */ 203 @Test 204 public void testRpcMaxRequestSize() throws IOException, ServiceException { 205 Configuration conf = new Configuration(CONF); 206 conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); 207 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 208 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 209 SERVICE, null)), new InetSocketAddress("localhost", 0), conf, 210 new FifoRpcScheduler(conf, 1)); 211 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 212 rpcServer.start(); 213 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 214 StringBuilder message = new StringBuilder(1200); 215 for (int i = 0; i < 200; i++) { 216 message.append("hello."); 217 } 218 // set total RPC size bigger than 100 bytes 219 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); 220 stub.echo( 221 new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), 222 param); 223 fail("RPC should have failed because it exceeds max request size"); 224 } catch (ServiceException e) { 225 LOG.info("Caught expected exception: " + e); 226 assertTrue(e.toString(), 227 StringUtils.stringifyException(e).contains("RequestTooBigException")); 228 } finally { 229 rpcServer.stop(); 230 } 231 } 232 233 /** 234 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 235 * remoteAddress set to its Call Object 236 */ 237 @Test 238 public void testRpcServerForNotNullRemoteAddressInCallObject() 239 throws IOException, ServiceException { 240 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 241 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 242 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 243 new FifoRpcScheduler(CONF, 1)); 244 InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 245 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 246 rpcServer.start(); 247 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 248 assertEquals(localAddr.getAddress().getHostAddress(), 249 stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); 250 } finally { 251 rpcServer.stop(); 252 } 253 } 254 255 @Test 256 public void testRemoteError() throws IOException, ServiceException { 257 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 258 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 259 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 260 new FifoRpcScheduler(CONF, 1)); 261 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 262 rpcServer.start(); 263 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 264 stub.error(null, EmptyRequestProto.getDefaultInstance()); 265 } catch (ServiceException e) { 266 LOG.info("Caught expected exception: " + e); 267 IOException ioe = ProtobufUtil.handleRemoteException(e); 268 assertTrue(ioe instanceof DoNotRetryIOException); 269 assertTrue(ioe.getMessage().contains("server error!")); 270 } finally { 271 rpcServer.stop(); 272 } 273 } 274 275 @Test 276 public void testTimeout() throws IOException { 277 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 278 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 279 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 280 new FifoRpcScheduler(CONF, 1)); 281 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 282 rpcServer.start(); 283 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 284 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 285 int ms = 1000; 286 int timeout = 100; 287 for (int i = 0; i < 10; i++) { 288 pcrc.reset(); 289 pcrc.setCallTimeout(timeout); 290 long startTime = System.nanoTime(); 291 try { 292 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); 293 } catch (ServiceException e) { 294 long waitTime = (System.nanoTime() - startTime) / 1000000; 295 // expected 296 LOG.info("Caught expected exception: " + e); 297 IOException ioe = ProtobufUtil.handleRemoteException(e); 298 assertTrue(ioe.getCause() instanceof CallTimeoutException); 299 // confirm that we got exception before the actual pause. 300 assertTrue(waitTime < ms); 301 } 302 } 303 } finally { 304 rpcServer.stop(); 305 } 306 } 307 308 protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, 309 final List<BlockingServiceAndInterface> services, 310 final InetSocketAddress bindAddress, Configuration conf, 311 RpcScheduler scheduler) throws IOException; 312 313 /** Tests that the connection closing is handled by the client with outstanding RPC calls */ 314 @Test 315 public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { 316 Configuration conf = new Configuration(CONF); 317 RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", 318 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 319 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 320 new FifoRpcScheduler(CONF, 1)); 321 322 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 323 rpcServer.start(); 324 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 325 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 326 stub.echo(null, param); 327 fail("RPC should have failed because connection closed"); 328 } catch (ServiceException e) { 329 LOG.info("Caught expected exception: " + e.toString()); 330 } finally { 331 rpcServer.stop(); 332 } 333 } 334 335 @Test 336 public void testAsyncEcho() throws IOException { 337 Configuration conf = HBaseConfiguration.create(); 338 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 339 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 340 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 341 new FifoRpcScheduler(CONF, 1)); 342 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 343 rpcServer.start(); 344 Interface stub = newStub(client, rpcServer.getListenerAddress()); 345 int num = 10; 346 List<HBaseRpcController> pcrcList = new ArrayList<>(); 347 List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); 348 for (int i = 0; i < num; i++) { 349 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 350 BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); 351 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); 352 pcrcList.add(pcrc); 353 callbackList.add(done); 354 } 355 for (int i = 0; i < num; i++) { 356 HBaseRpcController pcrc = pcrcList.get(i); 357 assertFalse(pcrc.failed()); 358 assertNull(pcrc.cellScanner()); 359 assertEquals("hello-" + i, callbackList.get(i).get().getMessage()); 360 } 361 } finally { 362 rpcServer.stop(); 363 } 364 } 365 366 @Test 367 public void testAsyncRemoteError() throws IOException { 368 AbstractRpcClient<?> client = createRpcClient(CONF); 369 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 370 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 371 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 372 new FifoRpcScheduler(CONF, 1)); 373 try { 374 rpcServer.start(); 375 Interface stub = newStub(client, rpcServer.getListenerAddress()); 376 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 377 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 378 stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); 379 assertNull(callback.get()); 380 assertTrue(pcrc.failed()); 381 LOG.info("Caught expected exception: " + pcrc.getFailed()); 382 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 383 assertTrue(ioe instanceof DoNotRetryIOException); 384 assertTrue(ioe.getMessage().contains("server error!")); 385 } finally { 386 client.close(); 387 rpcServer.stop(); 388 } 389 } 390 391 @Test 392 public void testAsyncTimeout() throws IOException { 393 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 394 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( 395 SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, 396 new FifoRpcScheduler(CONF, 1)); 397 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 398 rpcServer.start(); 399 Interface stub = newStub(client, rpcServer.getListenerAddress()); 400 List<HBaseRpcController> pcrcList = new ArrayList<>(); 401 List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); 402 int ms = 1000; 403 int timeout = 100; 404 long startTime = System.nanoTime(); 405 for (int i = 0; i < 10; i++) { 406 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 407 pcrc.setCallTimeout(timeout); 408 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 409 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); 410 pcrcList.add(pcrc); 411 callbackList.add(callback); 412 } 413 for (BlockingRpcCallback<?> callback : callbackList) { 414 assertNull(callback.get()); 415 } 416 long waitTime = (System.nanoTime() - startTime) / 1000000; 417 for (HBaseRpcController pcrc : pcrcList) { 418 assertTrue(pcrc.failed()); 419 LOG.info("Caught expected exception: " + pcrc.getFailed()); 420 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 421 assertTrue(ioe.getCause() instanceof CallTimeoutException); 422 } 423 // confirm that we got exception before the actual pause. 424 assertTrue(waitTime < ms); 425 } finally { 426 rpcServer.stop(); 427 } 428 } 429 430}