001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.RatioGauge; 025import com.codahale.metrics.RatioGauge.Ratio; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.ipc.CallTimeoutException; 035import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.MetricsTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048 049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 058 059@Category({ ClientTests.class, MetricsTests.class, SmallTests.class }) 060public class TestMetricsConnection { 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestMetricsConnection.class); 064 065 private static MetricsConnection METRICS; 066 private static final ThreadPoolExecutor BATCH_POOL = 067 (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 068 069 private static final String MOCK_CONN_STR = "mocked-connection"; 070 071 @BeforeClass 072 public static void beforeClass() { 073 METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null); 074 } 075 076 @AfterClass 077 public static void afterClass() { 078 MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR); 079 } 080 081 @Test 082 public void testMetricsConnectionScope() throws IOException { 083 Configuration conf = new Configuration(); 084 String clusterId = "foo"; 085 String scope = "testScope"; 086 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 087 088 AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 089 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 090 assertTrue("Metrics should be present", metrics.isPresent()); 091 assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), 092 metrics.get().getMetricScope()); 093 conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); 094 impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 095 096 metrics = impl.getConnectionMetrics(); 097 assertTrue("Metrics should be present", metrics.isPresent()); 098 assertEquals(scope, metrics.get().getMetricScope()); 099 } 100 101 @Test 102 public void testMetricsWithMutiConnections() throws IOException { 103 Configuration conf = new Configuration(); 104 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 105 conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test"); 106 107 User user = User.getCurrent(); 108 109 /* create multiple connections */ 110 final int num = 3; 111 AsyncConnectionImpl impl; 112 List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>(); 113 for (int i = 0; i < num; i++) { 114 impl = new AsyncConnectionImpl(conf, null, null, null, user); 115 connList.add(impl); 116 } 117 118 /* verify metrics presence */ 119 impl = connList.get(0); 120 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 121 assertTrue("Metrics should be present", metrics.isPresent()); 122 123 /* verify connection count in a shared metrics */ 124 long count = metrics.get().getConnectionCount(); 125 assertEquals("Failed to verify connection count." + count, count, num); 126 127 /* close some connections */ 128 for (int i = 0; i < num - 1; i++) { 129 connList.get(i).close(); 130 } 131 132 /* verify metrics presence again */ 133 impl = connList.get(num - 1); 134 metrics = impl.getConnectionMetrics(); 135 assertTrue("Metrics should be present after some of connections are closed.", 136 metrics.isPresent()); 137 138 /* verify count of remaining connections */ 139 count = metrics.get().getConnectionCount(); 140 assertEquals("Connection count suppose to be 1 but got: " + count, count, 1); 141 142 /* shutdown */ 143 impl.close(); 144 } 145 146 @Test 147 public void testStaticMetrics() throws IOException { 148 final byte[] foo = Bytes.toBytes("foo"); 149 final RegionSpecifier region = RegionSpecifier.newBuilder().setValue(ByteString.EMPTY) 150 .setType(RegionSpecifierType.REGION_NAME).build(); 151 final int loop = 5; 152 153 for (int i = 0; i < loop; i++) { 154 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"), 155 GetRequest.getDefaultInstance(), MetricsConnection.newCallStats(), null); 156 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"), 157 ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats(), 158 new RemoteWithExtrasException("java.io.IOException", null, false, false)); 159 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"), 160 MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats(), 161 new CallTimeoutException("test with CallTimeoutException")); 162 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 163 MutateRequest.newBuilder() 164 .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) 165 .setRegion(region).build(), 166 MetricsConnection.newCallStats(), null); 167 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 168 MutateRequest.newBuilder() 169 .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) 170 .setRegion(region).build(), 171 MetricsConnection.newCallStats(), null); 172 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 173 MutateRequest.newBuilder() 174 .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) 175 .setRegion(region).build(), 176 MetricsConnection.newCallStats(), null); 177 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 178 MutateRequest.newBuilder() 179 .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region) 180 .build(), 181 MetricsConnection.newCallStats(), null); 182 } 183 184 final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; 185 final String rpcFailureCountPrefix = 186 "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; 187 String metricKey; 188 long metricVal; 189 Counter counter; 190 191 for (String method : new String[] { "Get", "Scan", "Multi", "Mutate" }) { 192 metricKey = rpcCountPrefix + method; 193 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 194 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop); 195 196 metricKey = rpcFailureCountPrefix + method; 197 counter = METRICS.getRpcCounters().get(metricKey); 198 metricVal = (counter != null) ? counter.getCount() : 0; 199 if (method.equals("Get") || method.equals("Mutate")) { 200 // no failure 201 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == 0); 202 } else { 203 // has failure 204 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop); 205 } 206 } 207 208 // remote exception 209 metricKey = "rpcRemoteExceptions_IOException"; 210 counter = METRICS.getRpcCounters().get(metricKey); 211 metricVal = (counter != null) ? counter.getCount() : 0; 212 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop); 213 214 // local exception 215 metricKey = "rpcLocalExceptions_CallTimeoutException"; 216 counter = METRICS.getRpcCounters().get(metricKey); 217 metricVal = (counter != null) ? counter.getCount() : 0; 218 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop); 219 220 // total exception 221 metricKey = "rpcTotalExceptions"; 222 counter = METRICS.getRpcCounters().get(metricKey); 223 metricVal = (counter != null) ? counter.getCount() : 0; 224 assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop * 2); 225 226 for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { 227 METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(), 228 METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(), 229 METRICS.getPutTracker() }) { 230 assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount()); 231 assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount()); 232 assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount()); 233 } 234 RatioGauge executorMetrics = 235 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName()); 236 RatioGauge metaMetrics = 237 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName()); 238 assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0); 239 assertEquals(Double.NaN, metaMetrics.getValue(), 0); 240 } 241}