001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId; 027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; 030import static org.hamcrest.MatcherAssert.assertThat; 031import static org.hamcrest.Matchers.allOf; 032import static org.hamcrest.Matchers.everyItem; 033import static org.hamcrest.Matchers.greaterThanOrEqualTo; 034import static org.hamcrest.Matchers.hasItem; 035import static org.junit.Assert.assertEquals; 036import static org.junit.Assert.assertFalse; 037import static org.junit.Assert.assertNotNull; 038import static org.junit.Assert.assertNull; 039import static org.junit.Assert.assertThrows; 040import static org.junit.Assert.assertTrue; 041import static org.junit.Assert.fail; 042import static org.mockito.ArgumentMatchers.any; 043import static org.mockito.Mockito.spy; 044import static org.mockito.Mockito.verify; 045import static org.mockito.internal.verification.VerificationModeFactory.times; 046 047import io.opentelemetry.api.common.AttributeKey; 048import io.opentelemetry.api.trace.SpanKind; 049import io.opentelemetry.api.trace.StatusCode; 050import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 051import io.opentelemetry.sdk.trace.data.SpanData; 052import java.io.IOException; 053import java.net.InetSocketAddress; 054import java.time.Duration; 055import java.util.ArrayList; 056import java.util.List; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.hbase.Cell; 059import org.apache.hadoop.hbase.CellScanner; 060import org.apache.hadoop.hbase.CellUtil; 061import org.apache.hadoop.hbase.DoNotRetryIOException; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.KeyValue; 064import org.apache.hadoop.hbase.MatcherPredicate; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.Waiter; 067import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.io.compress.GzipCodec; 070import org.apache.hadoop.util.StringUtils; 071import org.hamcrest.Matcher; 072import org.junit.Rule; 073import org.junit.Test; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 078import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 079import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 080 081import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 082import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 083import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 084import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 085import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 086import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 087import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 088import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 089 090/** 091 * Some basic ipc tests. 092 */ 093public abstract class AbstractTestIPC { 094 095 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class); 096 097 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 098 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 099 100 protected static final Configuration CONF = HBaseConfiguration.create(); 101 static { 102 // Set the default to be the old SimpleRpcServer. Subclasses test it and netty. 103 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); 104 } 105 106 protected abstract RpcServer createRpcServer(final Server server, final String name, 107 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 108 Configuration conf, RpcScheduler scheduler) throws IOException; 109 110 protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); 111 112 @Rule 113 public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); 114 115 /** 116 * Ensure we do not HAVE TO HAVE a codec. 117 */ 118 @Test 119 public void testNoCodec() throws IOException, ServiceException { 120 Configuration conf = HBaseConfiguration.create(); 121 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 122 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 123 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 124 try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { 125 rpcServer.start(); 126 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 127 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 128 String message = "hello"; 129 assertEquals(message, 130 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 131 assertNull(pcrc.cellScanner()); 132 } finally { 133 rpcServer.stop(); 134 } 135 } 136 137 protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); 138 139 /** 140 * It is hard to verify the compression is actually happening under the wraps. Hope that if 141 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 142 * confirm that compression is happening down in the client and server). 143 */ 144 @Test 145 public void testCompressCellBlock() throws IOException, ServiceException { 146 Configuration conf = new Configuration(HBaseConfiguration.create()); 147 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 148 List<Cell> cells = new ArrayList<>(); 149 int count = 3; 150 for (int i = 0; i < count; i++) { 151 cells.add(CELL); 152 } 153 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 154 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 155 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 156 157 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 158 rpcServer.start(); 159 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 160 HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); 161 String message = "hello"; 162 assertEquals(message, 163 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 164 int index = 0; 165 CellScanner cellScanner = pcrc.cellScanner(); 166 assertNotNull(cellScanner); 167 while (cellScanner.advance()) { 168 assertEquals(CELL, cellScanner.current()); 169 index++; 170 } 171 assertEquals(count, index); 172 } finally { 173 rpcServer.stop(); 174 } 175 } 176 177 protected abstract AbstractRpcClient<?> 178 createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException; 179 180 @Test 181 public void testRTEDuringConnectionSetup() throws Exception { 182 Configuration conf = HBaseConfiguration.create(); 183 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 184 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 185 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 186 try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { 187 rpcServer.start(); 188 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 189 stub.ping(null, EmptyRequestProto.getDefaultInstance()); 190 fail("Expected an exception to have been thrown!"); 191 } catch (Exception e) { 192 LOG.info("Caught expected exception: " + e.toString()); 193 assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); 194 } finally { 195 rpcServer.stop(); 196 } 197 } 198 199 /** 200 * Tests that the rpc scheduler is called when requests arrive. 201 */ 202 @Test 203 public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { 204 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 205 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 206 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 207 new InetSocketAddress("localhost", 0), CONF, scheduler); 208 verify(scheduler).init(any(RpcScheduler.Context.class)); 209 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 210 rpcServer.start(); 211 verify(scheduler).start(); 212 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 213 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 214 for (int i = 0; i < 10; i++) { 215 stub.echo(null, param); 216 } 217 verify(scheduler, times(10)).dispatch(any(CallRunner.class)); 218 } finally { 219 rpcServer.stop(); 220 verify(scheduler).stop(); 221 } 222 } 223 224 /** Tests that the rpc scheduler is called when requests arrive. */ 225 @Test 226 public void testRpcMaxRequestSize() throws IOException, ServiceException { 227 Configuration conf = new Configuration(CONF); 228 conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); 229 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 230 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 231 new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); 232 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 233 rpcServer.start(); 234 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 235 StringBuilder message = new StringBuilder(1200); 236 for (int i = 0; i < 200; i++) { 237 message.append("hello."); 238 } 239 // set total RPC size bigger than 100 bytes 240 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); 241 stub.echo( 242 new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), 243 param); 244 fail("RPC should have failed because it exceeds max request size"); 245 } catch (ServiceException e) { 246 LOG.info("Caught expected exception: " + e); 247 assertTrue(e.toString(), 248 StringUtils.stringifyException(e).contains("RequestTooBigException")); 249 } finally { 250 rpcServer.stop(); 251 } 252 } 253 254 /** 255 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 256 * remoteAddress set to its Call Object 257 */ 258 @Test 259 public void testRpcServerForNotNullRemoteAddressInCallObject() 260 throws IOException, ServiceException { 261 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 262 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 263 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 264 InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 265 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 266 rpcServer.start(); 267 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 268 assertEquals(localAddr.getAddress().getHostAddress(), 269 stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); 270 } finally { 271 rpcServer.stop(); 272 } 273 } 274 275 @Test 276 public void testRemoteError() throws IOException, ServiceException { 277 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 278 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 279 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 280 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 281 rpcServer.start(); 282 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 283 stub.error(null, EmptyRequestProto.getDefaultInstance()); 284 } catch (ServiceException e) { 285 LOG.info("Caught expected exception: " + e); 286 IOException ioe = ProtobufUtil.handleRemoteException(e); 287 assertTrue(ioe instanceof DoNotRetryIOException); 288 assertTrue(ioe.getMessage().contains("server error!")); 289 } finally { 290 rpcServer.stop(); 291 } 292 } 293 294 @Test 295 public void testTimeout() throws IOException { 296 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 297 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 298 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 299 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 300 rpcServer.start(); 301 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 302 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 303 int ms = 1000; 304 int timeout = 100; 305 for (int i = 0; i < 10; i++) { 306 pcrc.reset(); 307 pcrc.setCallTimeout(timeout); 308 long startTime = System.nanoTime(); 309 try { 310 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); 311 } catch (ServiceException e) { 312 long waitTime = (System.nanoTime() - startTime) / 1000000; 313 // expected 314 LOG.info("Caught expected exception: " + e); 315 IOException ioe = ProtobufUtil.handleRemoteException(e); 316 assertTrue(ioe.getCause() instanceof CallTimeoutException); 317 // confirm that we got exception before the actual pause. 318 assertTrue(waitTime < ms); 319 } 320 } 321 } finally { 322 rpcServer.stop(); 323 } 324 } 325 326 protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, 327 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 328 Configuration conf, RpcScheduler scheduler) throws IOException; 329 330 /** Tests that the connection closing is handled by the client with outstanding RPC calls */ 331 @Test 332 public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { 333 Configuration conf = new Configuration(CONF); 334 RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", 335 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 336 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 337 338 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 339 rpcServer.start(); 340 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 341 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 342 stub.echo(null, param); 343 fail("RPC should have failed because connection closed"); 344 } catch (ServiceException e) { 345 LOG.info("Caught expected exception: " + e.toString()); 346 } finally { 347 rpcServer.stop(); 348 } 349 } 350 351 @Test 352 public void testAsyncEcho() throws IOException { 353 Configuration conf = HBaseConfiguration.create(); 354 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 355 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 356 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 357 try (AbstractRpcClient<?> client = createRpcClient(conf)) { 358 rpcServer.start(); 359 Interface stub = newStub(client, rpcServer.getListenerAddress()); 360 int num = 10; 361 List<HBaseRpcController> pcrcList = new ArrayList<>(); 362 List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); 363 for (int i = 0; i < num; i++) { 364 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 365 BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); 366 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); 367 pcrcList.add(pcrc); 368 callbackList.add(done); 369 } 370 for (int i = 0; i < num; i++) { 371 HBaseRpcController pcrc = pcrcList.get(i); 372 assertFalse(pcrc.failed()); 373 assertNull(pcrc.cellScanner()); 374 assertEquals("hello-" + i, callbackList.get(i).get().getMessage()); 375 } 376 } finally { 377 rpcServer.stop(); 378 } 379 } 380 381 @Test 382 public void testAsyncRemoteError() throws IOException { 383 AbstractRpcClient<?> client = createRpcClient(CONF); 384 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 385 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 386 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 387 try { 388 rpcServer.start(); 389 Interface stub = newStub(client, rpcServer.getListenerAddress()); 390 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 391 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 392 stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); 393 assertNull(callback.get()); 394 assertTrue(pcrc.failed()); 395 LOG.info("Caught expected exception: " + pcrc.getFailed()); 396 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 397 assertTrue(ioe instanceof DoNotRetryIOException); 398 assertTrue(ioe.getMessage().contains("server error!")); 399 } finally { 400 client.close(); 401 rpcServer.stop(); 402 } 403 } 404 405 @Test 406 public void testAsyncTimeout() throws IOException { 407 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 408 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 409 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 410 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 411 rpcServer.start(); 412 Interface stub = newStub(client, rpcServer.getListenerAddress()); 413 List<HBaseRpcController> pcrcList = new ArrayList<>(); 414 List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); 415 int ms = 1000; 416 int timeout = 100; 417 long startTime = System.nanoTime(); 418 for (int i = 0; i < 10; i++) { 419 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 420 pcrc.setCallTimeout(timeout); 421 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 422 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); 423 pcrcList.add(pcrc); 424 callbackList.add(callback); 425 } 426 for (BlockingRpcCallback<?> callback : callbackList) { 427 assertNull(callback.get()); 428 } 429 long waitTime = (System.nanoTime() - startTime) / 1000000; 430 for (HBaseRpcController pcrc : pcrcList) { 431 assertTrue(pcrc.failed()); 432 LOG.info("Caught expected exception: " + pcrc.getFailed()); 433 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 434 assertTrue(ioe.getCause() instanceof CallTimeoutException); 435 } 436 // confirm that we got exception before the actual pause. 437 assertTrue(waitTime < ms); 438 } finally { 439 rpcServer.stop(); 440 } 441 } 442 443 private SpanData waitSpan(Matcher<SpanData> matcher) { 444 Waiter.waitFor(CONF, 1000, 445 new MatcherPredicate<>(() -> traceRule.getSpans(), hasItem(matcher))); 446 return traceRule.getSpans().stream().filter(matcher::matches).findFirst() 447 .orElseThrow(AssertionError::new); 448 } 449 450 private static String buildIpcSpanName(final String packageAndService, final String methodName) { 451 return packageAndService + "/" + methodName; 452 } 453 454 private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService, 455 final String methodName) { 456 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 457 hasKind(SpanKind.CLIENT)); 458 } 459 460 private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService, 461 final String methodName) { 462 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 463 hasKind(SpanKind.SERVER)); 464 } 465 466 private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher( 467 final String packageAndService, final String methodName, final InetSocketAddress isa) { 468 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 469 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName), 470 containsEntry("net.peer.name", isa.getHostName()), 471 containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort()))); 472 } 473 474 private static Matcher<SpanData> 475 buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) { 476 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 477 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName))); 478 } 479 480 private void assertRemoteSpan() { 481 SpanData data = waitSpan(hasName("RpcServer.process")); 482 assertTrue(data.getParentSpanContext().isRemote()); 483 assertEquals(SpanKind.SERVER, data.getKind()); 484 } 485 486 @Test 487 public void testTracingSuccessIpc() throws IOException, ServiceException { 488 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 489 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 490 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 491 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 492 rpcServer.start(); 493 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 494 stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); 495 // use the ISA from the running server so that we can get the port selected. 496 final InetSocketAddress isa = rpcServer.getListenerAddress(); 497 final SpanData pauseClientSpan = 498 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 499 assertThat(pauseClientSpan, 500 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa)); 501 final SpanData pauseServerSpan = 502 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 503 assertThat(pauseServerSpan, 504 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 505 assertRemoteSpan(); 506 assertFalse("no spans provided", traceRule.getSpans().isEmpty()); 507 assertThat(traceRule.getSpans(), 508 everyItem(allOf(hasStatusWithCode(StatusCode.OK), 509 hasTraceId(traceRule.getSpans().iterator().next().getTraceId()), 510 hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L)))))); 511 } 512 } 513 514 @Test 515 public void testTracingErrorIpc() throws IOException { 516 RpcServer rpcServer = createRpcServer(null, "testRpcServer", 517 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 518 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 519 try (AbstractRpcClient<?> client = createRpcClient(CONF)) { 520 rpcServer.start(); 521 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 522 // use the ISA from the running server so that we can get the port selected. 523 assertThrows(ServiceException.class, 524 () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); 525 final InetSocketAddress isa = rpcServer.getListenerAddress(); 526 final SpanData errorClientSpan = 527 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 528 assertThat(errorClientSpan, 529 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa)); 530 final SpanData errorServerSpan = 531 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 532 assertThat(errorServerSpan, 533 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 534 assertRemoteSpan(); 535 assertFalse("no spans provided", traceRule.getSpans().isEmpty()); 536 assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR), 537 hasTraceId(traceRule.getSpans().iterator().next().getTraceId())))); 538 } 539 } 540}