001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.hasItem; 028 029import io.opentelemetry.api.trace.StatusCode; 030import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 031import java.net.InetSocketAddress; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CallDroppedException; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.MatcherPredicate; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; 039import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 040import org.apache.hadoop.hbase.testclassification.RPCTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.trace.TraceUtil; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046import org.junit.jupiter.api.TestInfo; 047import org.junit.jupiter.api.extension.RegisterExtension; 048import org.mockito.Mockito; 049 050@Tag(RPCTests.TAG) 051@Tag(SmallTests.TAG) 052public class TestCallRunner { 053 054 @RegisterExtension 055 public static final OpenTelemetryExtension otelRule = OpenTelemetryExtension.create(); 056 057 private Configuration conf = null; 058 private String testMethodName; 059 060 @BeforeEach 061 public void before(TestInfo testInfo) { 062 testMethodName = testInfo.getTestMethod().get().getName(); 063 final HBaseTestingUtil util = new HBaseTestingUtil(); 064 conf = util.getConfiguration(); 065 } 066 067 /** 068 * Does nothing but exercise a {@link CallRunner} outside of {@link RpcServer} context. 069 */ 070 @Test 071 public void testSimpleCall() { 072 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 073 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 074 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 075 076 TraceUtil.trace(() -> { 077 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 078 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 079 cr.run(); 080 }, testMethodName); 081 082 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 083 hasItem(allOf(hasName(testMethodName), hasEnded())))); 084 085 assertThat(otelRule.getSpans(), 086 hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.OK), hasEnded()))); 087 } 088 089 @Test 090 public void testCallCleanup() { 091 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 092 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 093 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 094 Mockito.when(mockCall.disconnectSince()).thenReturn(1L); 095 096 TraceUtil.trace(() -> { 097 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 098 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 099 cr.run(); 100 }, testMethodName); 101 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 102 } 103 104 @Test 105 public void testCallRunnerDropDisconnected() { 106 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 107 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 108 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 109 Mockito.when(mockCall.disconnectSince()).thenReturn(1L); 110 111 TraceUtil.trace(() -> { 112 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 113 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 114 cr.drop(); 115 }, testMethodName); 116 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 117 118 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 119 hasItem(allOf(hasName(testMethodName), hasEnded())))); 120 121 assertThat(otelRule.getSpans(), 122 hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.OK), 123 hasEvents(hasItem(EventMatchers.hasName("Client disconnect detected"))), hasEnded()))); 124 } 125 126 @Test 127 public void testCallRunnerDropConnected() { 128 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 129 MetricsHBaseServer mockMetrics = Mockito.mock(MetricsHBaseServer.class); 130 Mockito.when(mockRpcServer.getMetrics()).thenReturn(mockMetrics); 131 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 132 Mockito.when(mockRpcServer.getListenerAddress()) 133 .thenReturn(InetSocketAddress.createUnresolved("foo", 60020)); 134 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 135 Mockito.when(mockCall.disconnectSince()).thenReturn(-1L); 136 137 TraceUtil.trace(() -> { 138 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 139 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 140 cr.drop(); 141 }, testMethodName); 142 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 143 Mockito.verify(mockMetrics).exception(Mockito.any(CallDroppedException.class)); 144 145 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 146 hasItem(allOf(hasName(testMethodName), hasEnded())))); 147 148 assertThat(otelRule.getSpans(), 149 hasItem(allOf(hasName(testMethodName), hasStatusWithCode(StatusCode.ERROR), 150 hasEvents(hasItem(allOf(EventMatchers.hasName("exception"), 151 EventMatchers.hasAttributes( 152 containsEntry("exception.type", CallDroppedException.class.getName()))))), 153 hasEnded()))); 154 } 155}