001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.hasItem;
028
029import io.opentelemetry.api.trace.StatusCode;
030import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
031import java.net.InetSocketAddress;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CallDroppedException;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.MatcherPredicate;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers;
039import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
040import org.apache.hadoop.hbase.testclassification.RPCTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.trace.TraceUtil;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.junit.jupiter.api.TestInfo;
047import org.junit.jupiter.api.extension.RegisterExtension;
048import org.mockito.Mockito;
049
050@Tag(RPCTests.TAG)
051@Tag(SmallTests.TAG)
052public class TestCallRunner {
053
054  @RegisterExtension
055  public static final OpenTelemetryExtension otelRule = OpenTelemetryExtension.create();
056
057  private Configuration conf = null;
058  private String testMethodName;
059
060  @BeforeEach
061  public void before(TestInfo testInfo) {
062    testMethodName = testInfo.getTestMethod().get().getName();
063    final HBaseTestingUtil util = new HBaseTestingUtil();
064    conf = util.getConfiguration();
065  }
066
067  /**
068   * Does nothing but exercise a {@link CallRunner} outside of {@link RpcServer} context.
069   */
070  @Test
071  public void testSimpleCall() {
072    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
073    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
074    ServerCall<?> mockCall = Mockito.mock(ServerCall.class);
075
076    TraceUtil.trace(() -> {
077      CallRunner cr = new CallRunner(mockRpcServer, mockCall);
078      cr.setStatus(new MonitoredRPCHandlerImpl("test"));
079      cr.run();
080    }, testMethodName);
081
082    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans,
083      hasItem(allOf(hasName(testMethodName), hasEnded()))));
084
085    assertThat(otelRule.getSpans(),
086      hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.OK), hasEnded())));
087  }
088
089  @Test
090  public void testCallCleanup() {
091    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
092    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
093    ServerCall<?> mockCall = Mockito.mock(ServerCall.class);
094    Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
095
096    TraceUtil.trace(() -> {
097      CallRunner cr = new CallRunner(mockRpcServer, mockCall);
098      cr.setStatus(new MonitoredRPCHandlerImpl("test"));
099      cr.run();
100    }, testMethodName);
101    Mockito.verify(mockCall, Mockito.times(1)).cleanup();
102  }
103
104  @Test
105  public void testCallRunnerDropDisconnected() {
106    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
107    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
108    ServerCall<?> mockCall = Mockito.mock(ServerCall.class);
109    Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
110
111    TraceUtil.trace(() -> {
112      CallRunner cr = new CallRunner(mockRpcServer, mockCall);
113      cr.setStatus(new MonitoredRPCHandlerImpl("test"));
114      cr.drop();
115    }, testMethodName);
116    Mockito.verify(mockCall, Mockito.times(1)).cleanup();
117
118    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans,
119      hasItem(allOf(hasName(testMethodName), hasEnded()))));
120
121    assertThat(otelRule.getSpans(),
122      hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.OK),
123        hasEvents(hasItem(EventMatchers.hasName("Client disconnect detected"))), hasEnded())));
124  }
125
126  @Test
127  public void testCallRunnerDropConnected() {
128    RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
129    MetricsHBaseServer mockMetrics = Mockito.mock(MetricsHBaseServer.class);
130    Mockito.when(mockRpcServer.getMetrics()).thenReturn(mockMetrics);
131    Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
132    Mockito.when(mockRpcServer.getListenerAddress())
133      .thenReturn(InetSocketAddress.createUnresolved("foo", 60020));
134    ServerCall<?> mockCall = Mockito.mock(ServerCall.class);
135    Mockito.when(mockCall.disconnectSince()).thenReturn(-1L);
136
137    TraceUtil.trace(() -> {
138      CallRunner cr = new CallRunner(mockRpcServer, mockCall);
139      cr.setStatus(new MonitoredRPCHandlerImpl("test"));
140      cr.drop();
141    }, testMethodName);
142    Mockito.verify(mockCall, Mockito.times(1)).cleanup();
143    Mockito.verify(mockMetrics).exception(Mockito.any(CallDroppedException.class));
144
145    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans,
146      hasItem(allOf(hasName(testMethodName), hasEnded()))));
147
148    assertThat(otelRule.getSpans(),
149      hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.ERROR),
150        hasEvents(hasItem(allOf(EventMatchers.hasName("exception"),
151          EventMatchers.hasAttributes(
152            containsEntry("exception.type", CallDroppedException.class.getName()))))),
153        hasEnded())));
154  }
155}