001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId; 027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; 030import static org.awaitility.Awaitility.await; 031import static org.hamcrest.MatcherAssert.assertThat; 032import static org.hamcrest.Matchers.allOf; 033import static org.hamcrest.Matchers.containsString; 034import static org.hamcrest.Matchers.everyItem; 035import static org.hamcrest.Matchers.greaterThanOrEqualTo; 036import static org.hamcrest.Matchers.hasItem; 037import static org.hamcrest.Matchers.instanceOf; 038import static org.hamcrest.Matchers.lessThan; 039import static org.hamcrest.Matchers.startsWith; 040import static org.junit.jupiter.api.Assertions.assertEquals; 041import static org.junit.jupiter.api.Assertions.assertFalse; 042import static org.junit.jupiter.api.Assertions.assertNotNull; 043import static org.junit.jupiter.api.Assertions.assertNull; 044import static org.junit.jupiter.api.Assertions.assertThrows; 045import static org.junit.jupiter.api.Assertions.assertTrue; 046import static org.junit.jupiter.api.Assertions.fail; 047import static org.mockito.ArgumentMatchers.any; 048import static org.mockito.Mockito.mock; 049import static org.mockito.Mockito.spy; 050import static org.mockito.Mockito.verify; 051import static org.mockito.Mockito.when; 052import static org.mockito.internal.verification.VerificationModeFactory.times; 053 054import io.opentelemetry.api.common.AttributeKey; 055import io.opentelemetry.api.trace.SpanKind; 056import io.opentelemetry.api.trace.StatusCode; 057import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 058import io.opentelemetry.sdk.trace.data.SpanData; 059import java.io.IOException; 060import java.net.InetSocketAddress; 061import java.nio.channels.SocketChannel; 062import java.time.Duration; 063import java.util.ArrayList; 064import java.util.Collections; 065import java.util.List; 066import org.apache.hadoop.conf.Configuration; 067import org.apache.hadoop.hbase.CellScanner; 068import org.apache.hadoop.hbase.DoNotRetryIOException; 069import org.apache.hadoop.hbase.ExtendedCell; 070import org.apache.hadoop.hbase.HBaseConfiguration; 071import org.apache.hadoop.hbase.HBaseServerBase; 072import org.apache.hadoop.hbase.KeyValue; 073import org.apache.hadoop.hbase.MatcherPredicate; 074import org.apache.hadoop.hbase.PrivateCellUtil; 075import org.apache.hadoop.hbase.Server; 076import org.apache.hadoop.hbase.ServerName; 077import org.apache.hadoop.hbase.Waiter; 078import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 079import org.apache.hadoop.hbase.nio.ByteBuff; 080import org.apache.hadoop.hbase.security.User; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.io.compress.GzipCodec; 084import org.apache.hadoop.ipc.RemoteException; 085import org.apache.hadoop.util.StringUtils; 086import org.hamcrest.Matcher; 087import org.junit.jupiter.api.BeforeEach; 088import org.junit.jupiter.api.TestTemplate; 089import org.junit.jupiter.api.extension.RegisterExtension; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 094import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 095import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 096import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 097import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 098 099import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 100import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 101import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 102import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 103import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 104import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 105import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 106import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 110 111/** 112 * Some basic ipc tests. 113 */ 114public abstract class AbstractTestIPC { 115 116 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class); 117 118 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 119 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 120 121 protected static final Configuration CONF = HBaseConfiguration.create(); 122 123 protected RpcServer createRpcServer(Server server, String name, 124 List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf, 125 RpcScheduler scheduler) throws IOException { 126 return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); 127 } 128 129 private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services, 130 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { 131 return createRpcServer(null, name, services, bindAddress, conf, scheduler); 132 } 133 134 protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); 135 136 @RegisterExtension 137 private static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create(); 138 139 private Class<? extends RpcServer> rpcServerImpl; 140 141 protected AbstractTestIPC(Class<? extends RpcServer> rpcServerImpl) { 142 this.rpcServerImpl = rpcServerImpl; 143 } 144 145 @BeforeEach 146 public void setUpBeforeTest() { 147 CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class); 148 } 149 150 /** 151 * Ensure we do not HAVE TO HAVE a codec. 152 */ 153 @TestTemplate 154 public void testNoCodec() throws IOException, ServiceException { 155 Configuration clientConf = new Configuration(CONF); 156 RpcServer rpcServer = createRpcServer("testRpcServer", 157 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 158 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 159 try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) { 160 rpcServer.start(); 161 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 162 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 163 String message = "hello"; 164 assertEquals(message, 165 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 166 assertNull(pcrc.cellScanner()); 167 } finally { 168 rpcServer.stop(); 169 } 170 } 171 172 protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); 173 174 /** 175 * It is hard to verify the compression is actually happening under the wraps. Hope that if 176 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 177 * confirm that compression is happening down in the client and server). 178 */ 179 @TestTemplate 180 public void testCompressCellBlock() throws IOException, ServiceException { 181 Configuration clientConf = new Configuration(CONF); 182 clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 183 List<ExtendedCell> cells = new ArrayList<>(); 184 int count = 3; 185 for (int i = 0; i < count; i++) { 186 cells.add(CELL); 187 } 188 RpcServer rpcServer = createRpcServer("testRpcServer", 189 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 190 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 191 192 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 193 rpcServer.start(); 194 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 195 HBaseRpcController pcrc = 196 new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells)); 197 String message = "hello"; 198 assertEquals(message, 199 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 200 int index = 0; 201 CellScanner cellScanner = pcrc.cellScanner(); 202 assertNotNull(cellScanner); 203 while (cellScanner.advance()) { 204 assertEquals(CELL, cellScanner.current()); 205 index++; 206 } 207 assertEquals(count, index); 208 } finally { 209 rpcServer.stop(); 210 } 211 } 212 213 protected abstract AbstractRpcClient<?> 214 createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException; 215 216 @TestTemplate 217 public void testRTEDuringConnectionSetup() throws Exception { 218 Configuration clientConf = new Configuration(CONF); 219 RpcServer rpcServer = createRpcServer("testRpcServer", 220 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 221 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 222 try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) { 223 rpcServer.start(); 224 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 225 stub.ping(null, EmptyRequestProto.getDefaultInstance()); 226 fail("Expected an exception to have been thrown!"); 227 } catch (Exception e) { 228 LOG.info("Caught expected exception: " + e.toString()); 229 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"), e.toString()); 230 } finally { 231 rpcServer.stop(); 232 } 233 } 234 235 /** 236 * Tests that the rpc scheduler is called when requests arrive. 237 */ 238 @TestTemplate 239 public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { 240 Configuration clientConf = new Configuration(CONF); 241 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 242 RpcServer rpcServer = createRpcServer("testRpcServer", 243 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 244 new InetSocketAddress("localhost", 0), CONF, scheduler); 245 verify(scheduler).init(any(RpcScheduler.Context.class)); 246 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 247 rpcServer.start(); 248 verify(scheduler).start(); 249 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 250 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 251 for (int i = 0; i < 10; i++) { 252 stub.echo(null, param); 253 } 254 verify(scheduler, times(10)).dispatch(any(CallRunner.class)); 255 } finally { 256 rpcServer.stop(); 257 verify(scheduler).stop(); 258 } 259 } 260 261 /** Tests that the rpc scheduler is called when requests arrive. */ 262 @TestTemplate 263 public void testRpcMaxRequestSize() throws IOException, ServiceException { 264 Configuration clientConf = new Configuration(CONF); 265 clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); 266 RpcServer rpcServer = createRpcServer("testRpcServer", 267 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 268 new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1)); 269 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 270 rpcServer.start(); 271 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 272 StringBuilder message = new StringBuilder(1200); 273 for (int i = 0; i < 200; i++) { 274 message.append("hello."); 275 } 276 // set total RPC size bigger than 100 bytes 277 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); 278 stub.echo(new HBaseRpcControllerImpl( 279 PrivateCellUtil.createExtendedCellScanner(ImmutableList.<ExtendedCell> of(CELL))), param); 280 fail("RPC should have failed because it exceeds max request size"); 281 } catch (ServiceException e) { 282 LOG.info("Caught expected exception: " + e); 283 assertTrue(StringUtils.stringifyException(e).contains("RequestTooBigException"), 284 e.toString()); 285 } finally { 286 rpcServer.stop(); 287 } 288 } 289 290 /** 291 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 292 * remoteAddress set to its Call Object 293 */ 294 @TestTemplate 295 public void testRpcServerForNotNullRemoteAddressInCallObject() 296 throws IOException, ServiceException { 297 Configuration clientConf = new Configuration(CONF); 298 RpcServer rpcServer = createRpcServer("testRpcServer", 299 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 300 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 301 InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 302 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 303 rpcServer.start(); 304 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 305 assertEquals(localAddr.getAddress().getHostAddress(), 306 stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); 307 } finally { 308 rpcServer.stop(); 309 } 310 } 311 312 @TestTemplate 313 public void testRemoteError() throws IOException, ServiceException { 314 Configuration clientConf = new Configuration(CONF); 315 RpcServer rpcServer = createRpcServer("testRpcServer", 316 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 317 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 318 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 319 rpcServer.start(); 320 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 321 ServiceException se = assertThrows(ServiceException.class, 322 () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); 323 LOG.info("Caught expected exception: " + se); 324 IOException ioe = ProtobufUtil.handleRemoteException(se); 325 assertThat(ioe, instanceOf(DoNotRetryIOException.class)); 326 assertThat(ioe.getMessage(), containsString("server error!")); 327 } finally { 328 rpcServer.stop(); 329 } 330 } 331 332 @TestTemplate 333 public void testTimeout() throws IOException { 334 Configuration clientConf = new Configuration(CONF); 335 RpcServer rpcServer = createRpcServer("testRpcServer", 336 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 337 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 338 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 339 rpcServer.start(); 340 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 341 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 342 int ms = 1000; 343 int timeout = 100; 344 for (int i = 0; i < 10; i++) { 345 pcrc.reset(); 346 pcrc.setCallTimeout(timeout); 347 long startTime = System.nanoTime(); 348 ServiceException se = assertThrows(ServiceException.class, 349 () -> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build())); 350 long waitTime = (System.nanoTime() - startTime) / 1000000; 351 // expected 352 LOG.info("Caught expected exception: " + se); 353 IOException ioe = ProtobufUtil.handleRemoteException(se); 354 assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class)); 355 // confirm that we got exception before the actual pause. 356 assertThat(waitTime, lessThan((long) ms)); 357 } 358 } finally { 359 // wait until all active calls quit, otherwise it may mess up the tracing spans 360 await().atMost(Duration.ofSeconds(2)) 361 .untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount())); 362 rpcServer.stop(); 363 } 364 } 365 366 @SuppressWarnings("deprecation") 367 private static class FailingSimpleRpcServer extends SimpleRpcServer { 368 369 FailingSimpleRpcServer(Server server, String name, 370 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 371 Configuration conf, RpcScheduler scheduler) throws IOException { 372 super(server, name, services, bindAddress, conf, scheduler, true); 373 } 374 375 final class FailingConnection extends SimpleServerRpcConnection { 376 private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel, 377 long lastContact) { 378 super(rpcServer, channel, lastContact); 379 } 380 381 @Override 382 public void processRequest(ByteBuff buf) throws IOException, InterruptedException { 383 // this will throw exception after the connection header is read, and an RPC is sent 384 // from client 385 throw new DoNotRetryIOException("Failing for test"); 386 } 387 } 388 389 @Override 390 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 391 return new FailingConnection(this, channel, time); 392 } 393 } 394 395 protected RpcServer createTestFailingRpcServer(final String name, 396 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 397 Configuration conf, RpcScheduler scheduler) throws IOException { 398 if (rpcServerImpl.equals(NettyRpcServer.class)) { 399 return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler); 400 } else { 401 return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler); 402 } 403 } 404 405 /** Tests that the connection closing is handled by the client with outstanding RPC calls */ 406 @TestTemplate 407 public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { 408 Configuration clientConf = new Configuration(CONF); 409 RpcServer rpcServer = createTestFailingRpcServer("testRpcServer", 410 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 411 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 412 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 413 rpcServer.start(); 414 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 415 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 416 ServiceException se = assertThrows(ServiceException.class, () -> stub.echo(null, param), 417 "RPC should have failed because connection closed"); 418 LOG.info("Caught expected exception: " + se); 419 } finally { 420 rpcServer.stop(); 421 } 422 } 423 424 @TestTemplate 425 public void testAsyncEcho() throws IOException { 426 Configuration clientConf = new Configuration(CONF); 427 RpcServer rpcServer = createRpcServer("testRpcServer", 428 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 429 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 430 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 431 rpcServer.start(); 432 Interface stub = newStub(client, rpcServer.getListenerAddress()); 433 int num = 10; 434 List<HBaseRpcController> pcrcList = new ArrayList<>(); 435 List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); 436 for (int i = 0; i < num; i++) { 437 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 438 BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); 439 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); 440 pcrcList.add(pcrc); 441 callbackList.add(done); 442 } 443 for (int i = 0; i < num; i++) { 444 EchoResponseProto resp = callbackList.get(i).get(); 445 HBaseRpcController pcrc = pcrcList.get(i); 446 assertEquals("hello-" + i, resp.getMessage()); 447 assertFalse(pcrc.failed()); 448 assertNull(pcrc.cellScanner()); 449 } 450 } finally { 451 rpcServer.stop(); 452 } 453 } 454 455 @TestTemplate 456 public void testAsyncRemoteError() throws IOException { 457 Configuration clientConf = new Configuration(CONF); 458 RpcServer rpcServer = createRpcServer("testRpcServer", 459 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 460 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 461 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 462 rpcServer.start(); 463 Interface stub = newStub(client, rpcServer.getListenerAddress()); 464 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 465 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 466 stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); 467 assertNull(callback.get()); 468 assertTrue(pcrc.failed()); 469 LOG.info("Caught expected exception: " + pcrc.getFailed()); 470 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 471 assertThat(ioe, instanceOf(DoNotRetryIOException.class)); 472 assertThat(ioe.getMessage(), containsString("server error!")); 473 } finally { 474 rpcServer.stop(); 475 } 476 } 477 478 @TestTemplate 479 public void testAsyncTimeout() throws IOException { 480 Configuration clientConf = new Configuration(CONF); 481 RpcServer rpcServer = createRpcServer("testRpcServer", 482 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 483 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 484 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 485 rpcServer.start(); 486 Interface stub = newStub(client, rpcServer.getListenerAddress()); 487 List<HBaseRpcController> pcrcList = new ArrayList<>(); 488 List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); 489 int ms = 1000; 490 int timeout = 100; 491 long startTime = System.nanoTime(); 492 for (int i = 0; i < 10; i++) { 493 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 494 pcrc.setCallTimeout(timeout); 495 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 496 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); 497 pcrcList.add(pcrc); 498 callbackList.add(callback); 499 } 500 for (BlockingRpcCallback<?> callback : callbackList) { 501 assertNull(callback.get()); 502 } 503 long waitTime = (System.nanoTime() - startTime) / 1000000; 504 for (HBaseRpcController pcrc : pcrcList) { 505 assertTrue(pcrc.failed()); 506 LOG.info("Caught expected exception: " + pcrc.getFailed()); 507 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 508 assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class)); 509 } 510 // confirm that we got exception before the actual pause. 511 assertThat(waitTime, lessThan((long) ms)); 512 } finally { 513 // wait until all active calls quit, otherwise it may mess up the tracing spans 514 await().atMost(Duration.ofSeconds(2)) 515 .untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount())); 516 rpcServer.stop(); 517 } 518 } 519 520 private SpanData waitSpan(Matcher<SpanData> matcher) { 521 Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(() -> OTEL_EXT.getSpans(), hasItem(matcher))); 522 return OTEL_EXT.getSpans().stream().filter(matcher::matches).findFirst() 523 .orElseThrow(AssertionError::new); 524 } 525 526 private static String buildIpcSpanName(final String packageAndService, final String methodName) { 527 return packageAndService + "/" + methodName; 528 } 529 530 private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService, 531 final String methodName) { 532 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 533 hasKind(SpanKind.CLIENT)); 534 } 535 536 private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService, 537 final String methodName) { 538 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 539 hasKind(SpanKind.SERVER)); 540 } 541 542 private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher( 543 final String packageAndService, final String methodName, final InetSocketAddress isa) { 544 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 545 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName), 546 containsEntry("net.peer.name", isa.getHostName()), 547 containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort()))); 548 } 549 550 private static Matcher<SpanData> 551 buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) { 552 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 553 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName))); 554 } 555 556 private void assertRemoteSpan() { 557 SpanData data = waitSpan(hasName("RpcServer.process")); 558 assertTrue(data.getParentSpanContext().isRemote()); 559 assertEquals(SpanKind.SERVER, data.getKind()); 560 } 561 562 @TestTemplate 563 public void testTracingSuccessIpc() throws IOException, ServiceException { 564 Configuration clientConf = new Configuration(CONF); 565 RpcServer rpcServer = createRpcServer("testRpcServer", 566 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 567 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 568 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 569 rpcServer.start(); 570 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 571 stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); 572 // use the ISA from the running server so that we can get the port selected. 573 final InetSocketAddress isa = rpcServer.getListenerAddress(); 574 final SpanData pauseClientSpan = 575 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 576 assertThat(pauseClientSpan, 577 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa)); 578 final SpanData pauseServerSpan = 579 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 580 assertThat(pauseServerSpan, 581 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 582 assertRemoteSpan(); 583 assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided"); 584 assertThat(OTEL_EXT.getSpans(), 585 everyItem(allOf(hasStatusWithCode(StatusCode.OK), 586 hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId()), 587 hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L)))))); 588 } finally { 589 rpcServer.stop(); 590 } 591 } 592 593 @TestTemplate 594 public void testTracingErrorIpc() throws IOException { 595 Configuration clientConf = new Configuration(CONF); 596 RpcServer rpcServer = createRpcServer("testRpcServer", 597 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 598 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 599 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 600 rpcServer.start(); 601 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 602 // use the ISA from the running server so that we can get the port selected. 603 assertThrows(ServiceException.class, 604 () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); 605 final InetSocketAddress isa = rpcServer.getListenerAddress(); 606 final SpanData errorClientSpan = 607 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 608 assertThat(errorClientSpan, 609 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa)); 610 final SpanData errorServerSpan = 611 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 612 assertThat(errorServerSpan, 613 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 614 assertRemoteSpan(); 615 assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided"); 616 assertThat(OTEL_EXT.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR), 617 hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId())))); 618 } finally { 619 rpcServer.stop(); 620 } 621 } 622 623 protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf); 624 625 private IOException doBadPreableHeaderCall(BlockingInterface stub) { 626 ServiceException se = assertThrows(ServiceException.class, 627 () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); 628 return ProtobufUtil.handleRemoteException(se); 629 } 630 631 @TestTemplate 632 public void testBadPreambleHeader() throws Exception { 633 Configuration clientConf = new Configuration(CONF); 634 RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), 635 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 636 try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) { 637 rpcServer.start(); 638 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 639 BadAuthException error = null; 640 // for SimpleRpcServer, it is possible that we get a broken pipe before getting the 641 // BadAuthException, so we add some retries here, see HBASE-28417 642 for (int i = 0; i < 10; i++) { 643 IOException ioe = doBadPreableHeaderCall(stub); 644 if (ioe instanceof BadAuthException) { 645 error = (BadAuthException) ioe; 646 break; 647 } 648 Thread.sleep(100); 649 } 650 assertNotNull(error, "Can not get expected BadAuthException"); 651 assertThat(error.getMessage(), containsString("authName=unknown")); 652 } finally { 653 rpcServer.stop(); 654 } 655 } 656 657 /** 658 * Testcase for getting connection registry information through connection preamble header, see 659 * HBASE-25051 for more details. 660 */ 661 @TestTemplate 662 public void testGetConnectionRegistry() throws IOException, ServiceException { 663 Configuration clientConf = new Configuration(CONF); 664 String clusterId = "test_cluster_id"; 665 HBaseServerBase<?> server = mock(HBaseServerBase.class); 666 when(server.getClusterId()).thenReturn(clusterId); 667 // do not need any services 668 RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(), 669 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 670 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 671 rpcServer.start(); 672 InetSocketAddress addr = rpcServer.getListenerAddress(); 673 BlockingRpcChannel channel = 674 client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(), 675 EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); 676 ConnectionRegistryService.BlockingInterface stub = 677 ConnectionRegistryService.newBlockingStub(channel); 678 GetConnectionRegistryResponse resp = 679 stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance()); 680 assertEquals(clusterId, resp.getClusterId()); 681 } finally { 682 rpcServer.stop(); 683 } 684 } 685 686 /** 687 * Test server does not support getting connection registry information through connection 688 * preamble header, i.e, a new client connecting to an old server. We simulate this by using a 689 * Server without implementing the ConnectionRegistryEndpoint interface. 690 */ 691 @TestTemplate 692 public void testGetConnectionRegistryError() throws IOException, ServiceException { 693 Configuration clientConf = new Configuration(CONF); 694 // do not need any services 695 RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), 696 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 697 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 698 rpcServer.start(); 699 InetSocketAddress addr = rpcServer.getListenerAddress(); 700 RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(), 701 addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); 702 ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel); 703 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 704 BlockingRpcCallback<GetConnectionRegistryResponse> done = new BlockingRpcCallback<>(); 705 stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done); 706 // should have failed so no response 707 assertNull(done.get()); 708 assertTrue(pcrc.failed()); 709 // should be a FatalConnectionException 710 assertThat(pcrc.getFailed(), instanceOf(RemoteException.class)); 711 assertEquals(FatalConnectionException.class.getName(), 712 ((RemoteException) pcrc.getFailed()).getClassName()); 713 assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER=")); 714 } finally { 715 rpcServer.stop(); 716 } 717 } 718}