001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
030import static org.awaitility.Awaitility.await;
031import static org.hamcrest.MatcherAssert.assertThat;
032import static org.hamcrest.Matchers.allOf;
033import static org.hamcrest.Matchers.containsString;
034import static org.hamcrest.Matchers.everyItem;
035import static org.hamcrest.Matchers.greaterThanOrEqualTo;
036import static org.hamcrest.Matchers.hasItem;
037import static org.hamcrest.Matchers.instanceOf;
038import static org.hamcrest.Matchers.lessThan;
039import static org.hamcrest.Matchers.startsWith;
040import static org.junit.jupiter.api.Assertions.assertEquals;
041import static org.junit.jupiter.api.Assertions.assertFalse;
042import static org.junit.jupiter.api.Assertions.assertNotNull;
043import static org.junit.jupiter.api.Assertions.assertNull;
044import static org.junit.jupiter.api.Assertions.assertThrows;
045import static org.junit.jupiter.api.Assertions.assertTrue;
046import static org.junit.jupiter.api.Assertions.fail;
047import static org.mockito.ArgumentMatchers.any;
048import static org.mockito.Mockito.mock;
049import static org.mockito.Mockito.spy;
050import static org.mockito.Mockito.verify;
051import static org.mockito.Mockito.when;
052import static org.mockito.internal.verification.VerificationModeFactory.times;
053
054import io.opentelemetry.api.common.AttributeKey;
055import io.opentelemetry.api.trace.SpanKind;
056import io.opentelemetry.api.trace.StatusCode;
057import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
058import io.opentelemetry.sdk.trace.data.SpanData;
059import java.io.IOException;
060import java.net.InetSocketAddress;
061import java.nio.channels.SocketChannel;
062import java.time.Duration;
063import java.util.ArrayList;
064import java.util.Collections;
065import java.util.List;
066import org.apache.hadoop.conf.Configuration;
067import org.apache.hadoop.hbase.CellScanner;
068import org.apache.hadoop.hbase.DoNotRetryIOException;
069import org.apache.hadoop.hbase.ExtendedCell;
070import org.apache.hadoop.hbase.HBaseConfiguration;
071import org.apache.hadoop.hbase.HBaseServerBase;
072import org.apache.hadoop.hbase.KeyValue;
073import org.apache.hadoop.hbase.MatcherPredicate;
074import org.apache.hadoop.hbase.PrivateCellUtil;
075import org.apache.hadoop.hbase.Server;
076import org.apache.hadoop.hbase.ServerName;
077import org.apache.hadoop.hbase.Waiter;
078import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
079import org.apache.hadoop.hbase.nio.ByteBuff;
080import org.apache.hadoop.hbase.security.User;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.io.compress.GzipCodec;
084import org.apache.hadoop.ipc.RemoteException;
085import org.apache.hadoop.util.StringUtils;
086import org.hamcrest.Matcher;
087import org.junit.jupiter.api.BeforeEach;
088import org.junit.jupiter.api.TestTemplate;
089import org.junit.jupiter.api.extension.RegisterExtension;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
094import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
095import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
096import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
097import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
098
099import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
100import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
101import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
102import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
103import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
104import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
105import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
106import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
110
111/**
112 * Some basic ipc tests.
113 */
114public abstract class AbstractTestIPC {
115
116  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
117
118  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
119  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
120
121  protected static final Configuration CONF = HBaseConfiguration.create();
122
123  protected RpcServer createRpcServer(Server server, String name,
124    List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
125    RpcScheduler scheduler) throws IOException {
126    return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
127  }
128
129  private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
130    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
131    return createRpcServer(null, name, services, bindAddress, conf, scheduler);
132  }
133
134  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
135
136  @RegisterExtension
137  private static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create();
138
139  private Class<? extends RpcServer> rpcServerImpl;
140
141  protected AbstractTestIPC(Class<? extends RpcServer> rpcServerImpl) {
142    this.rpcServerImpl = rpcServerImpl;
143  }
144
145  @BeforeEach
146  public void setUpBeforeTest() {
147    CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
148  }
149
150  /**
151   * Ensure we do not HAVE TO HAVE a codec.
152   */
153  @TestTemplate
154  public void testNoCodec() throws IOException, ServiceException {
155    Configuration clientConf = new Configuration(CONF);
156    RpcServer rpcServer = createRpcServer("testRpcServer",
157      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
158      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
159    try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) {
160      rpcServer.start();
161      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
162      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
163      String message = "hello";
164      assertEquals(message,
165        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
166      assertNull(pcrc.cellScanner());
167    } finally {
168      rpcServer.stop();
169    }
170  }
171
172  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
173
174  /**
175   * It is hard to verify the compression is actually happening under the wraps. Hope that if
176   * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
177   * confirm that compression is happening down in the client and server).
178   */
179  @TestTemplate
180  public void testCompressCellBlock() throws IOException, ServiceException {
181    Configuration clientConf = new Configuration(CONF);
182    clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
183    List<ExtendedCell> cells = new ArrayList<>();
184    int count = 3;
185    for (int i = 0; i < count; i++) {
186      cells.add(CELL);
187    }
188    RpcServer rpcServer = createRpcServer("testRpcServer",
189      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
190      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
191
192    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
193      rpcServer.start();
194      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
195      HBaseRpcController pcrc =
196        new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells));
197      String message = "hello";
198      assertEquals(message,
199        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
200      int index = 0;
201      CellScanner cellScanner = pcrc.cellScanner();
202      assertNotNull(cellScanner);
203      while (cellScanner.advance()) {
204        assertEquals(CELL, cellScanner.current());
205        index++;
206      }
207      assertEquals(count, index);
208    } finally {
209      rpcServer.stop();
210    }
211  }
212
213  protected abstract AbstractRpcClient<?>
214    createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException;
215
216  @TestTemplate
217  public void testRTEDuringConnectionSetup() throws Exception {
218    Configuration clientConf = new Configuration(CONF);
219    RpcServer rpcServer = createRpcServer("testRpcServer",
220      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
221      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
222    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) {
223      rpcServer.start();
224      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
225      stub.ping(null, EmptyRequestProto.getDefaultInstance());
226      fail("Expected an exception to have been thrown!");
227    } catch (Exception e) {
228      LOG.info("Caught expected exception: " + e.toString());
229      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"), e.toString());
230    } finally {
231      rpcServer.stop();
232    }
233  }
234
235  /**
236   * Tests that the rpc scheduler is called when requests arrive.
237   */
238  @TestTemplate
239  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
240    Configuration clientConf = new Configuration(CONF);
241    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
242    RpcServer rpcServer = createRpcServer("testRpcServer",
243      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
244      new InetSocketAddress("localhost", 0), CONF, scheduler);
245    verify(scheduler).init(any(RpcScheduler.Context.class));
246    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
247      rpcServer.start();
248      verify(scheduler).start();
249      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
250      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
251      for (int i = 0; i < 10; i++) {
252        stub.echo(null, param);
253      }
254      verify(scheduler, times(10)).dispatch(any(CallRunner.class));
255    } finally {
256      rpcServer.stop();
257      verify(scheduler).stop();
258    }
259  }
260
261  /** Tests that the rpc scheduler is called when requests arrive. */
262  @TestTemplate
263  public void testRpcMaxRequestSize() throws IOException, ServiceException {
264    Configuration clientConf = new Configuration(CONF);
265    clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
266    RpcServer rpcServer = createRpcServer("testRpcServer",
267      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
268      new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1));
269    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
270      rpcServer.start();
271      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
272      StringBuilder message = new StringBuilder(1200);
273      for (int i = 0; i < 200; i++) {
274        message.append("hello.");
275      }
276      // set total RPC size bigger than 100 bytes
277      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
278      stub.echo(new HBaseRpcControllerImpl(
279        PrivateCellUtil.createExtendedCellScanner(ImmutableList.<ExtendedCell> of(CELL))), param);
280      fail("RPC should have failed because it exceeds max request size");
281    } catch (ServiceException e) {
282      LOG.info("Caught expected exception: " + e);
283      assertTrue(StringUtils.stringifyException(e).contains("RequestTooBigException"),
284        e.toString());
285    } finally {
286      rpcServer.stop();
287    }
288  }
289
290  /**
291   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
292   * remoteAddress set to its Call Object
293   */
294  @TestTemplate
295  public void testRpcServerForNotNullRemoteAddressInCallObject()
296    throws IOException, ServiceException {
297    Configuration clientConf = new Configuration(CONF);
298    RpcServer rpcServer = createRpcServer("testRpcServer",
299      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
300      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
301    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
302    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
303      rpcServer.start();
304      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
305      assertEquals(localAddr.getAddress().getHostAddress(),
306        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
307    } finally {
308      rpcServer.stop();
309    }
310  }
311
312  @TestTemplate
313  public void testRemoteError() throws IOException, ServiceException {
314    Configuration clientConf = new Configuration(CONF);
315    RpcServer rpcServer = createRpcServer("testRpcServer",
316      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
317      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
318    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
319      rpcServer.start();
320      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
321      ServiceException se = assertThrows(ServiceException.class,
322        () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
323      LOG.info("Caught expected exception: " + se);
324      IOException ioe = ProtobufUtil.handleRemoteException(se);
325      assertThat(ioe, instanceOf(DoNotRetryIOException.class));
326      assertThat(ioe.getMessage(), containsString("server error!"));
327    } finally {
328      rpcServer.stop();
329    }
330  }
331
332  @TestTemplate
333  public void testTimeout() throws IOException {
334    Configuration clientConf = new Configuration(CONF);
335    RpcServer rpcServer = createRpcServer("testRpcServer",
336      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
337      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
338    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
339      rpcServer.start();
340      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
341      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
342      int ms = 1000;
343      int timeout = 100;
344      for (int i = 0; i < 10; i++) {
345        pcrc.reset();
346        pcrc.setCallTimeout(timeout);
347        long startTime = System.nanoTime();
348        ServiceException se = assertThrows(ServiceException.class,
349          () -> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()));
350        long waitTime = (System.nanoTime() - startTime) / 1000000;
351        // expected
352        LOG.info("Caught expected exception: " + se);
353        IOException ioe = ProtobufUtil.handleRemoteException(se);
354        assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class));
355        // confirm that we got exception before the actual pause.
356        assertThat(waitTime, lessThan((long) ms));
357      }
358    } finally {
359      // wait until all active calls quit, otherwise it may mess up the tracing spans
360      await().atMost(Duration.ofSeconds(2))
361        .untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount()));
362      rpcServer.stop();
363    }
364  }
365
366  @SuppressWarnings("deprecation")
367  private static class FailingSimpleRpcServer extends SimpleRpcServer {
368
369    FailingSimpleRpcServer(Server server, String name,
370      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
371      Configuration conf, RpcScheduler scheduler) throws IOException {
372      super(server, name, services, bindAddress, conf, scheduler, true);
373    }
374
375    final class FailingConnection extends SimpleServerRpcConnection {
376      private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
377        long lastContact) {
378        super(rpcServer, channel, lastContact);
379      }
380
381      @Override
382      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
383        // this will throw exception after the connection header is read, and an RPC is sent
384        // from client
385        throw new DoNotRetryIOException("Failing for test");
386      }
387    }
388
389    @Override
390    protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
391      return new FailingConnection(this, channel, time);
392    }
393  }
394
395  protected RpcServer createTestFailingRpcServer(final String name,
396    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
397    Configuration conf, RpcScheduler scheduler) throws IOException {
398    if (rpcServerImpl.equals(NettyRpcServer.class)) {
399      return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
400    } else {
401      return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
402    }
403  }
404
405  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
406  @TestTemplate
407  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
408    Configuration clientConf = new Configuration(CONF);
409    RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
410      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
411      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
412    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
413      rpcServer.start();
414      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
415      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
416      ServiceException se = assertThrows(ServiceException.class, () -> stub.echo(null, param),
417        "RPC should have failed because connection closed");
418      LOG.info("Caught expected exception: " + se);
419    } finally {
420      rpcServer.stop();
421    }
422  }
423
424  @TestTemplate
425  public void testAsyncEcho() throws IOException {
426    Configuration clientConf = new Configuration(CONF);
427    RpcServer rpcServer = createRpcServer("testRpcServer",
428      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
429      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
430    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
431      rpcServer.start();
432      Interface stub = newStub(client, rpcServer.getListenerAddress());
433      int num = 10;
434      List<HBaseRpcController> pcrcList = new ArrayList<>();
435      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
436      for (int i = 0; i < num; i++) {
437        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
438        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
439        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
440        pcrcList.add(pcrc);
441        callbackList.add(done);
442      }
443      for (int i = 0; i < num; i++) {
444        EchoResponseProto resp = callbackList.get(i).get();
445        HBaseRpcController pcrc = pcrcList.get(i);
446        assertEquals("hello-" + i, resp.getMessage());
447        assertFalse(pcrc.failed());
448        assertNull(pcrc.cellScanner());
449      }
450    } finally {
451      rpcServer.stop();
452    }
453  }
454
455  @TestTemplate
456  public void testAsyncRemoteError() throws IOException {
457    Configuration clientConf = new Configuration(CONF);
458    RpcServer rpcServer = createRpcServer("testRpcServer",
459      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
460      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
461    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
462      rpcServer.start();
463      Interface stub = newStub(client, rpcServer.getListenerAddress());
464      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
465      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
466      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
467      assertNull(callback.get());
468      assertTrue(pcrc.failed());
469      LOG.info("Caught expected exception: " + pcrc.getFailed());
470      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
471      assertThat(ioe, instanceOf(DoNotRetryIOException.class));
472      assertThat(ioe.getMessage(), containsString("server error!"));
473    } finally {
474      rpcServer.stop();
475    }
476  }
477
478  @TestTemplate
479  public void testAsyncTimeout() throws IOException {
480    Configuration clientConf = new Configuration(CONF);
481    RpcServer rpcServer = createRpcServer("testRpcServer",
482      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
483      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
484    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
485      rpcServer.start();
486      Interface stub = newStub(client, rpcServer.getListenerAddress());
487      List<HBaseRpcController> pcrcList = new ArrayList<>();
488      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
489      int ms = 1000;
490      int timeout = 100;
491      long startTime = System.nanoTime();
492      for (int i = 0; i < 10; i++) {
493        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
494        pcrc.setCallTimeout(timeout);
495        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
496        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
497        pcrcList.add(pcrc);
498        callbackList.add(callback);
499      }
500      for (BlockingRpcCallback<?> callback : callbackList) {
501        assertNull(callback.get());
502      }
503      long waitTime = (System.nanoTime() - startTime) / 1000000;
504      for (HBaseRpcController pcrc : pcrcList) {
505        assertTrue(pcrc.failed());
506        LOG.info("Caught expected exception: " + pcrc.getFailed());
507        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
508        assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class));
509      }
510      // confirm that we got exception before the actual pause.
511      assertThat(waitTime, lessThan((long) ms));
512    } finally {
513      // wait until all active calls quit, otherwise it may mess up the tracing spans
514      await().atMost(Duration.ofSeconds(2))
515        .untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount()));
516      rpcServer.stop();
517    }
518  }
519
520  private SpanData waitSpan(Matcher<SpanData> matcher) {
521    Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(() -> OTEL_EXT.getSpans(), hasItem(matcher)));
522    return OTEL_EXT.getSpans().stream().filter(matcher::matches).findFirst()
523      .orElseThrow(AssertionError::new);
524  }
525
526  private static String buildIpcSpanName(final String packageAndService, final String methodName) {
527    return packageAndService + "/" + methodName;
528  }
529
530  private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService,
531    final String methodName) {
532    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
533      hasKind(SpanKind.CLIENT));
534  }
535
536  private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService,
537    final String methodName) {
538    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
539      hasKind(SpanKind.SERVER));
540  }
541
542  private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
543    final String packageAndService, final String methodName, final InetSocketAddress isa) {
544    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
545      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName),
546      containsEntry("net.peer.name", isa.getHostName()),
547      containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
548  }
549
550  private static Matcher<SpanData>
551    buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) {
552    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
553      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName)));
554  }
555
556  private void assertRemoteSpan() {
557    SpanData data = waitSpan(hasName("RpcServer.process"));
558    assertTrue(data.getParentSpanContext().isRemote());
559    assertEquals(SpanKind.SERVER, data.getKind());
560  }
561
562  @TestTemplate
563  public void testTracingSuccessIpc() throws IOException, ServiceException {
564    Configuration clientConf = new Configuration(CONF);
565    RpcServer rpcServer = createRpcServer("testRpcServer",
566      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
567      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
568    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
569      rpcServer.start();
570      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
571      stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
572      // use the ISA from the running server so that we can get the port selected.
573      final InetSocketAddress isa = rpcServer.getListenerAddress();
574      final SpanData pauseClientSpan =
575        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
576      assertThat(pauseClientSpan,
577        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa));
578      final SpanData pauseServerSpan =
579        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
580      assertThat(pauseServerSpan,
581        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
582      assertRemoteSpan();
583      assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided");
584      assertThat(OTEL_EXT.getSpans(),
585        everyItem(allOf(hasStatusWithCode(StatusCode.OK),
586          hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId()),
587          hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
588    } finally {
589      rpcServer.stop();
590    }
591  }
592
593  @TestTemplate
594  public void testTracingErrorIpc() throws IOException {
595    Configuration clientConf = new Configuration(CONF);
596    RpcServer rpcServer = createRpcServer("testRpcServer",
597      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
598      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
599    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
600      rpcServer.start();
601      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
602      // use the ISA from the running server so that we can get the port selected.
603      assertThrows(ServiceException.class,
604        () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
605      final InetSocketAddress isa = rpcServer.getListenerAddress();
606      final SpanData errorClientSpan =
607        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
608      assertThat(errorClientSpan,
609        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa));
610      final SpanData errorServerSpan =
611        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
612      assertThat(errorServerSpan,
613        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
614      assertRemoteSpan();
615      assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided");
616      assertThat(OTEL_EXT.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),
617        hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId()))));
618    } finally {
619      rpcServer.stop();
620    }
621  }
622
623  protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);
624
625  private IOException doBadPreableHeaderCall(BlockingInterface stub) {
626    ServiceException se = assertThrows(ServiceException.class,
627      () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
628    return ProtobufUtil.handleRemoteException(se);
629  }
630
631  @TestTemplate
632  public void testBadPreambleHeader() throws Exception {
633    Configuration clientConf = new Configuration(CONF);
634    RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
635      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
636    try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
637      rpcServer.start();
638      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
639      BadAuthException error = null;
640      // for SimpleRpcServer, it is possible that we get a broken pipe before getting the
641      // BadAuthException, so we add some retries here, see HBASE-28417
642      for (int i = 0; i < 10; i++) {
643        IOException ioe = doBadPreableHeaderCall(stub);
644        if (ioe instanceof BadAuthException) {
645          error = (BadAuthException) ioe;
646          break;
647        }
648        Thread.sleep(100);
649      }
650      assertNotNull(error, "Can not get expected BadAuthException");
651      assertThat(error.getMessage(), containsString("authName=unknown"));
652    } finally {
653      rpcServer.stop();
654    }
655  }
656
657  /**
658   * Testcase for getting connection registry information through connection preamble header, see
659   * HBASE-25051 for more details.
660   */
661  @TestTemplate
662  public void testGetConnectionRegistry() throws IOException, ServiceException {
663    Configuration clientConf = new Configuration(CONF);
664    String clusterId = "test_cluster_id";
665    HBaseServerBase<?> server = mock(HBaseServerBase.class);
666    when(server.getClusterId()).thenReturn(clusterId);
667    // do not need any services
668    RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(),
669      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
670    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
671      rpcServer.start();
672      InetSocketAddress addr = rpcServer.getListenerAddress();
673      BlockingRpcChannel channel =
674        client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(),
675          EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);
676      ConnectionRegistryService.BlockingInterface stub =
677        ConnectionRegistryService.newBlockingStub(channel);
678      GetConnectionRegistryResponse resp =
679        stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance());
680      assertEquals(clusterId, resp.getClusterId());
681    } finally {
682      rpcServer.stop();
683    }
684  }
685
686  /**
687   * Test server does not support getting connection registry information through connection
688   * preamble header, i.e, a new client connecting to an old server. We simulate this by using a
689   * Server without implementing the ConnectionRegistryEndpoint interface.
690   */
691  @TestTemplate
692  public void testGetConnectionRegistryError() throws IOException, ServiceException {
693    Configuration clientConf = new Configuration(CONF);
694    // do not need any services
695    RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
696      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
697    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
698      rpcServer.start();
699      InetSocketAddress addr = rpcServer.getListenerAddress();
700      RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(),
701        addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);
702      ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);
703      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
704      BlockingRpcCallback<GetConnectionRegistryResponse> done = new BlockingRpcCallback<>();
705      stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done);
706      // should have failed so no response
707      assertNull(done.get());
708      assertTrue(pcrc.failed());
709      // should be a FatalConnectionException
710      assertThat(pcrc.getFailed(), instanceOf(RemoteException.class));
711      assertEquals(FatalConnectionException.class.getName(),
712        ((RemoteException) pcrc.getFailed()).getClassName());
713      assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER="));
714    } finally {
715      rpcServer.stop();
716    }
717  }
718}