001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
030import static org.hamcrest.MatcherAssert.assertThat;
031import static org.hamcrest.Matchers.allOf;
032import static org.hamcrest.Matchers.everyItem;
033import static org.hamcrest.Matchers.greaterThanOrEqualTo;
034import static org.hamcrest.Matchers.hasItem;
035import static org.junit.Assert.assertEquals;
036import static org.junit.Assert.assertFalse;
037import static org.junit.Assert.assertNotNull;
038import static org.junit.Assert.assertNull;
039import static org.junit.Assert.assertThrows;
040import static org.junit.Assert.assertTrue;
041import static org.junit.Assert.fail;
042import static org.mockito.ArgumentMatchers.any;
043import static org.mockito.Mockito.spy;
044import static org.mockito.Mockito.verify;
045import static org.mockito.internal.verification.VerificationModeFactory.times;
046
047import io.opentelemetry.api.common.AttributeKey;
048import io.opentelemetry.api.trace.SpanKind;
049import io.opentelemetry.api.trace.StatusCode;
050import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
051import io.opentelemetry.sdk.trace.data.SpanData;
052import java.io.IOException;
053import java.net.InetSocketAddress;
054import java.time.Duration;
055import java.util.ArrayList;
056import java.util.List;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.hbase.Cell;
059import org.apache.hadoop.hbase.CellScanner;
060import org.apache.hadoop.hbase.CellUtil;
061import org.apache.hadoop.hbase.DoNotRetryIOException;
062import org.apache.hadoop.hbase.HBaseConfiguration;
063import org.apache.hadoop.hbase.KeyValue;
064import org.apache.hadoop.hbase.MatcherPredicate;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.Waiter;
067import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.io.compress.GzipCodec;
070import org.apache.hadoop.util.StringUtils;
071import org.hamcrest.Matcher;
072import org.junit.Rule;
073import org.junit.Test;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
080
081import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
082import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
083import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
084import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
085import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
086import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
087import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
088import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
089
090/**
091 * Some basic ipc tests.
092 */
093public abstract class AbstractTestIPC {
094
095  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
096
097  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
098  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
099
100  protected static final Configuration CONF = HBaseConfiguration.create();
101  static {
102    // Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
103    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
104  }
105
106  protected abstract RpcServer createRpcServer(final Server server, final String name,
107    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
108    Configuration conf, RpcScheduler scheduler) throws IOException;
109
110  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
111
112  @Rule
113  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
114
115  /**
116   * Ensure we do not HAVE TO HAVE a codec.
117   */
118  @Test
119  public void testNoCodec() throws IOException, ServiceException {
120    Configuration conf = HBaseConfiguration.create();
121    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
122      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
123      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
124    try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
125      rpcServer.start();
126      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
127      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
128      String message = "hello";
129      assertEquals(message,
130        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
131      assertNull(pcrc.cellScanner());
132    } finally {
133      rpcServer.stop();
134    }
135  }
136
137  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
138
139  /**
140   * It is hard to verify the compression is actually happening under the wraps. Hope that if
141   * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
142   * confirm that compression is happening down in the client and server).
143   */
144  @Test
145  public void testCompressCellBlock() throws IOException, ServiceException {
146    Configuration conf = new Configuration(HBaseConfiguration.create());
147    conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
148    List<Cell> cells = new ArrayList<>();
149    int count = 3;
150    for (int i = 0; i < count; i++) {
151      cells.add(CELL);
152    }
153    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
154      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
155      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
156
157    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
158      rpcServer.start();
159      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
160      HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
161      String message = "hello";
162      assertEquals(message,
163        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
164      int index = 0;
165      CellScanner cellScanner = pcrc.cellScanner();
166      assertNotNull(cellScanner);
167      while (cellScanner.advance()) {
168        assertEquals(CELL, cellScanner.current());
169        index++;
170      }
171      assertEquals(count, index);
172    } finally {
173      rpcServer.stop();
174    }
175  }
176
177  protected abstract AbstractRpcClient<?>
178    createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException;
179
180  @Test
181  public void testRTEDuringConnectionSetup() throws Exception {
182    Configuration conf = HBaseConfiguration.create();
183    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
184      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
185      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
186    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
187      rpcServer.start();
188      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
189      stub.ping(null, EmptyRequestProto.getDefaultInstance());
190      fail("Expected an exception to have been thrown!");
191    } catch (Exception e) {
192      LOG.info("Caught expected exception: " + e.toString());
193      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
194    } finally {
195      rpcServer.stop();
196    }
197  }
198
199  /**
200   * Tests that the rpc scheduler is called when requests arrive.
201   */
202  @Test
203  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
204    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
205    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
206      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
207      new InetSocketAddress("localhost", 0), CONF, scheduler);
208    verify(scheduler).init(any(RpcScheduler.Context.class));
209    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
210      rpcServer.start();
211      verify(scheduler).start();
212      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
213      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
214      for (int i = 0; i < 10; i++) {
215        stub.echo(null, param);
216      }
217      verify(scheduler, times(10)).dispatch(any(CallRunner.class));
218    } finally {
219      rpcServer.stop();
220      verify(scheduler).stop();
221    }
222  }
223
224  /** Tests that the rpc scheduler is called when requests arrive. */
225  @Test
226  public void testRpcMaxRequestSize() throws IOException, ServiceException {
227    Configuration conf = new Configuration(CONF);
228    conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
229    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
230      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
231      new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
232    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
233      rpcServer.start();
234      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
235      StringBuilder message = new StringBuilder(1200);
236      for (int i = 0; i < 200; i++) {
237        message.append("hello.");
238      }
239      // set total RPC size bigger than 100 bytes
240      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
241      stub.echo(
242        new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
243        param);
244      fail("RPC should have failed because it exceeds max request size");
245    } catch (ServiceException e) {
246      LOG.info("Caught expected exception: " + e);
247      assertTrue(e.toString(),
248        StringUtils.stringifyException(e).contains("RequestTooBigException"));
249    } finally {
250      rpcServer.stop();
251    }
252  }
253
254  /**
255   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
256   * remoteAddress set to its Call Object
257   */
258  @Test
259  public void testRpcServerForNotNullRemoteAddressInCallObject()
260    throws IOException, ServiceException {
261    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
262      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
263      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
264    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
265    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
266      rpcServer.start();
267      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
268      assertEquals(localAddr.getAddress().getHostAddress(),
269        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
270    } finally {
271      rpcServer.stop();
272    }
273  }
274
275  @Test
276  public void testRemoteError() throws IOException, ServiceException {
277    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
278      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
279      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
280    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
281      rpcServer.start();
282      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
283      stub.error(null, EmptyRequestProto.getDefaultInstance());
284    } catch (ServiceException e) {
285      LOG.info("Caught expected exception: " + e);
286      IOException ioe = ProtobufUtil.handleRemoteException(e);
287      assertTrue(ioe instanceof DoNotRetryIOException);
288      assertTrue(ioe.getMessage().contains("server error!"));
289    } finally {
290      rpcServer.stop();
291    }
292  }
293
294  @Test
295  public void testTimeout() throws IOException {
296    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
297      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
298      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
299    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
300      rpcServer.start();
301      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
302      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
303      int ms = 1000;
304      int timeout = 100;
305      for (int i = 0; i < 10; i++) {
306        pcrc.reset();
307        pcrc.setCallTimeout(timeout);
308        long startTime = System.nanoTime();
309        try {
310          stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
311        } catch (ServiceException e) {
312          long waitTime = (System.nanoTime() - startTime) / 1000000;
313          // expected
314          LOG.info("Caught expected exception: " + e);
315          IOException ioe = ProtobufUtil.handleRemoteException(e);
316          assertTrue(ioe.getCause() instanceof CallTimeoutException);
317          // confirm that we got exception before the actual pause.
318          assertTrue(waitTime < ms);
319        }
320      }
321    } finally {
322      rpcServer.stop();
323    }
324  }
325
326  protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
327    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
328    Configuration conf, RpcScheduler scheduler) throws IOException;
329
330  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
331  @Test
332  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
333    Configuration conf = new Configuration(CONF);
334    RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
335      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
336      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
337
338    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
339      rpcServer.start();
340      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
341      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
342      stub.echo(null, param);
343      fail("RPC should have failed because connection closed");
344    } catch (ServiceException e) {
345      LOG.info("Caught expected exception: " + e.toString());
346    } finally {
347      rpcServer.stop();
348    }
349  }
350
351  @Test
352  public void testAsyncEcho() throws IOException {
353    Configuration conf = HBaseConfiguration.create();
354    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
355      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
356      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
357    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
358      rpcServer.start();
359      Interface stub = newStub(client, rpcServer.getListenerAddress());
360      int num = 10;
361      List<HBaseRpcController> pcrcList = new ArrayList<>();
362      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
363      for (int i = 0; i < num; i++) {
364        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
365        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
366        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
367        pcrcList.add(pcrc);
368        callbackList.add(done);
369      }
370      for (int i = 0; i < num; i++) {
371        HBaseRpcController pcrc = pcrcList.get(i);
372        assertFalse(pcrc.failed());
373        assertNull(pcrc.cellScanner());
374        assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
375      }
376    } finally {
377      rpcServer.stop();
378    }
379  }
380
381  @Test
382  public void testAsyncRemoteError() throws IOException {
383    AbstractRpcClient<?> client = createRpcClient(CONF);
384    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
385      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
386      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
387    try {
388      rpcServer.start();
389      Interface stub = newStub(client, rpcServer.getListenerAddress());
390      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
391      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
392      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
393      assertNull(callback.get());
394      assertTrue(pcrc.failed());
395      LOG.info("Caught expected exception: " + pcrc.getFailed());
396      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
397      assertTrue(ioe instanceof DoNotRetryIOException);
398      assertTrue(ioe.getMessage().contains("server error!"));
399    } finally {
400      client.close();
401      rpcServer.stop();
402    }
403  }
404
405  @Test
406  public void testAsyncTimeout() throws IOException {
407    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
408      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
409      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
410    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
411      rpcServer.start();
412      Interface stub = newStub(client, rpcServer.getListenerAddress());
413      List<HBaseRpcController> pcrcList = new ArrayList<>();
414      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
415      int ms = 1000;
416      int timeout = 100;
417      long startTime = System.nanoTime();
418      for (int i = 0; i < 10; i++) {
419        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
420        pcrc.setCallTimeout(timeout);
421        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
422        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
423        pcrcList.add(pcrc);
424        callbackList.add(callback);
425      }
426      for (BlockingRpcCallback<?> callback : callbackList) {
427        assertNull(callback.get());
428      }
429      long waitTime = (System.nanoTime() - startTime) / 1000000;
430      for (HBaseRpcController pcrc : pcrcList) {
431        assertTrue(pcrc.failed());
432        LOG.info("Caught expected exception: " + pcrc.getFailed());
433        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
434        assertTrue(ioe.getCause() instanceof CallTimeoutException);
435      }
436      // confirm that we got exception before the actual pause.
437      assertTrue(waitTime < ms);
438    } finally {
439      rpcServer.stop();
440    }
441  }
442
443  private SpanData waitSpan(Matcher<SpanData> matcher) {
444    Waiter.waitFor(CONF, 1000,
445      new MatcherPredicate<>(() -> traceRule.getSpans(), hasItem(matcher)));
446    return traceRule.getSpans().stream().filter(matcher::matches).findFirst()
447      .orElseThrow(AssertionError::new);
448  }
449
450  private static String buildIpcSpanName(final String packageAndService, final String methodName) {
451    return packageAndService + "/" + methodName;
452  }
453
454  private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService,
455    final String methodName) {
456    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
457      hasKind(SpanKind.CLIENT));
458  }
459
460  private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService,
461    final String methodName) {
462    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
463      hasKind(SpanKind.SERVER));
464  }
465
466  private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
467    final String packageAndService, final String methodName, final InetSocketAddress isa) {
468    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
469      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName),
470      containsEntry("net.peer.name", isa.getHostName()),
471      containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
472  }
473
474  private static Matcher<SpanData>
475    buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) {
476    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
477      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName)));
478  }
479
480  private void assertRemoteSpan() {
481    SpanData data = waitSpan(hasName("RpcServer.process"));
482    assertTrue(data.getParentSpanContext().isRemote());
483    assertEquals(SpanKind.SERVER, data.getKind());
484  }
485
486  @Test
487  public void testTracingSuccessIpc() throws IOException, ServiceException {
488    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
489      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
490      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
491    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
492      rpcServer.start();
493      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
494      stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
495      // use the ISA from the running server so that we can get the port selected.
496      final InetSocketAddress isa = rpcServer.getListenerAddress();
497      final SpanData pauseClientSpan =
498        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
499      assertThat(pauseClientSpan,
500        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa));
501      final SpanData pauseServerSpan =
502        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
503      assertThat(pauseServerSpan,
504        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
505      assertRemoteSpan();
506      assertFalse("no spans provided", traceRule.getSpans().isEmpty());
507      assertThat(traceRule.getSpans(),
508        everyItem(allOf(hasStatusWithCode(StatusCode.OK),
509          hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
510          hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
511    }
512  }
513
514  @Test
515  public void testTracingErrorIpc() throws IOException {
516    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
517      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
518      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
519    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
520      rpcServer.start();
521      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
522      // use the ISA from the running server so that we can get the port selected.
523      assertThrows(ServiceException.class,
524        () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
525      final InetSocketAddress isa = rpcServer.getListenerAddress();
526      final SpanData errorClientSpan =
527        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
528      assertThat(errorClientSpan,
529        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa));
530      final SpanData errorServerSpan =
531        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
532      assertThat(errorServerSpan,
533        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
534      assertRemoteSpan();
535      assertFalse("no spans provided", traceRule.getSpans().isEmpty());
536      assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),
537        hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
538    }
539  }
540}