001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.hasItem; 028 029import io.opentelemetry.api.trace.StatusCode; 030import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 031import java.net.InetSocketAddress; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CallDroppedException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.MatcherPredicate; 038import org.apache.hadoop.hbase.Waiter; 039import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; 040import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; 041import org.apache.hadoop.hbase.testclassification.RPCTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.trace.TraceUtil; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050import org.mockito.Mockito; 051 052@Category({ RPCTests.class, SmallTests.class }) 053public class TestCallRunner { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestCallRunner.class); 058 059 @Rule 060 public TestName testName = new TestName(); 061 062 @Rule 063 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 064 065 private Configuration conf = null; 066 067 @Before 068 public void before() { 069 final HBaseTestingUtil util = new HBaseTestingUtil(); 070 conf = util.getConfiguration(); 071 } 072 073 /** 074 * Does nothing but exercise a {@link CallRunner} outside of {@link RpcServer} context. 075 */ 076 @Test 077 public void testSimpleCall() { 078 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 079 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 080 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 081 082 TraceUtil.trace(() -> { 083 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 084 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 085 cr.run(); 086 }, testName.getMethodName()); 087 088 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 089 hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 090 091 assertThat(otelRule.getSpans(), hasItem( 092 allOf(hasName(testName.getMethodName()), hasStatusWithCode(StatusCode.OK), hasEnded()))); 093 } 094 095 @Test 096 public void testCallCleanup() { 097 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 098 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 099 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 100 Mockito.when(mockCall.disconnectSince()).thenReturn(1L); 101 102 TraceUtil.trace(() -> { 103 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 104 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 105 cr.run(); 106 }, testName.getMethodName()); 107 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 108 } 109 110 @Test 111 public void testCallRunnerDropDisconnected() { 112 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 113 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 114 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 115 Mockito.when(mockCall.disconnectSince()).thenReturn(1L); 116 117 TraceUtil.trace(() -> { 118 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 119 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 120 cr.drop(); 121 }, testName.getMethodName()); 122 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 123 124 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 125 hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 126 127 assertThat(otelRule.getSpans(), 128 hasItem(allOf(hasName(testName.getMethodName()), hasStatusWithCode(StatusCode.OK), 129 hasEvents(hasItem(EventMatchers.hasName("Client disconnect detected"))), hasEnded()))); 130 } 131 132 @Test 133 public void testCallRunnerDropConnected() { 134 RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); 135 MetricsHBaseServer mockMetrics = Mockito.mock(MetricsHBaseServer.class); 136 Mockito.when(mockRpcServer.getMetrics()).thenReturn(mockMetrics); 137 Mockito.when(mockRpcServer.isStarted()).thenReturn(true); 138 Mockito.when(mockRpcServer.getListenerAddress()) 139 .thenReturn(InetSocketAddress.createUnresolved("foo", 60020)); 140 ServerCall<?> mockCall = Mockito.mock(ServerCall.class); 141 Mockito.when(mockCall.disconnectSince()).thenReturn(-1L); 142 143 TraceUtil.trace(() -> { 144 CallRunner cr = new CallRunner(mockRpcServer, mockCall); 145 cr.setStatus(new MonitoredRPCHandlerImpl("test")); 146 cr.drop(); 147 }, testName.getMethodName()); 148 Mockito.verify(mockCall, Mockito.times(1)).cleanup(); 149 Mockito.verify(mockMetrics).exception(Mockito.any(CallDroppedException.class)); 150 151 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(otelRule::getSpans, 152 hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 153 154 assertThat(otelRule.getSpans(), 155 hasItem(allOf(hasName(testName.getMethodName()), hasStatusWithCode(StatusCode.ERROR), 156 hasEvents(hasItem(allOf(EventMatchers.hasName("exception"), 157 EventMatchers.hasAttributes( 158 containsEntry("exception.type", CallDroppedException.class.getName()))))), 159 hasEnded()))); 160 } 161}