001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import com.codahale.metrics.Counter; 025import com.codahale.metrics.RatioGauge; 026import com.codahale.metrics.RatioGauge.Ratio; 027import com.codahale.metrics.Timer; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Optional; 032import java.util.concurrent.Executors; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.stream.Stream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.ipc.CallTimeoutException; 039import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MetricsTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.AfterEach; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.TestTemplate; 049import org.junit.jupiter.params.provider.Arguments; 050 051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 063 064@Tag(ClientTests.TAG) 065@Tag(MetricsTests.TAG) 066@Tag(SmallTests.TAG) 067@HBaseParameterizedTestTemplate 068public class TestMetricsConnection { 069 070 private static final Configuration conf = new Configuration(); 071 private static MetricsConnection METRICS; 072 private static final ThreadPoolExecutor BATCH_POOL = 073 (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 074 075 private static final String MOCK_CONN_STR = "mocked-connection"; 076 077 public boolean tableMetricsEnabled; 078 079 public TestMetricsConnection(boolean tableMetricsEnabled) { 080 this.tableMetricsEnabled = tableMetricsEnabled; 081 } 082 083 public static Stream<Arguments> parameters() { 084 return Stream.of(Arguments.of(true), Arguments.of(false)); 085 } 086 087 @BeforeEach 088 public void before() { 089 conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled); 090 METRICS = 091 MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null); 092 } 093 094 @AfterEach 095 public void after() { 096 MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR); 097 } 098 099 @TestTemplate 100 public void testMetricsConnectionScope() throws IOException { 101 Configuration conf = new Configuration(); 102 String clusterId = "foo"; 103 String scope = "testScope"; 104 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 105 106 AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 107 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 108 assertTrue(metrics.isPresent(), "Metrics should be present"); 109 assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), 110 metrics.get().getMetricScope()); 111 conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); 112 impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 113 114 metrics = impl.getConnectionMetrics(); 115 assertTrue(metrics.isPresent(), "Metrics should be present"); 116 assertEquals(scope, metrics.get().getMetricScope()); 117 } 118 119 @TestTemplate 120 public void testMetricsWithMultiConnections() throws IOException { 121 Configuration conf = new Configuration(); 122 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 123 conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test"); 124 125 User user = User.getCurrent(); 126 127 /* create multiple connections */ 128 final int num = 3; 129 AsyncConnectionImpl impl; 130 List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>(); 131 for (int i = 0; i < num; i++) { 132 impl = new AsyncConnectionImpl(conf, null, null, null, user); 133 connList.add(impl); 134 } 135 136 /* verify metrics presence */ 137 impl = connList.get(0); 138 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 139 assertTrue(metrics.isPresent(), "Metrics should be present"); 140 141 /* verify connection count in a shared metrics */ 142 long count = metrics.get().getConnectionCount(); 143 assertEquals(count, num, "Failed to verify connection count." + count); 144 145 /* close some connections */ 146 for (int i = 0; i < num - 1; i++) { 147 connList.get(i).close(); 148 } 149 150 /* verify metrics presence again */ 151 impl = connList.get(num - 1); 152 metrics = impl.getConnectionMetrics(); 153 assertTrue(metrics.isPresent(), 154 "Metrics should be present after some of connections are closed."); 155 156 /* verify count of remaining connections */ 157 count = metrics.get().getConnectionCount(); 158 assertEquals(count, 1, "Connection count suppose to be 1 but got: " + count); 159 160 /* shutdown */ 161 impl.close(); 162 } 163 164 @TestTemplate 165 public void testStaticMetrics() throws IOException { 166 final byte[] foo = Bytes.toBytes("foo"); 167 String table = "TableX"; 168 final RegionSpecifier region = RegionSpecifier.newBuilder() 169 .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build(); 170 final int loop = 5; 171 172 for (int i = 0; i < loop; i++) { 173 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"), 174 TableName.valueOf(table), 175 GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(), 176 MetricsConnection.newCallStats(), null); 177 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"), 178 TableName.valueOf(table), 179 ScanRequest.newBuilder().setRegion(region) 180 .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(), 181 MetricsConnection.newCallStats(), 182 new RemoteWithExtrasException("java.io.IOException", null, false, false)); 183 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"), 184 TableName.valueOf(table), 185 MultiRequest.newBuilder() 186 .addRegionAction(ClientProtos.RegionAction.newBuilder() 187 .addAction( 188 ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build()) 189 .setRegion(region).build()) 190 .build(), 191 MetricsConnection.newCallStats(), 192 new CallTimeoutException("test with CallTimeoutException")); 193 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 194 TableName.valueOf(table), 195 MutateRequest.newBuilder() 196 .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) 197 .setRegion(region).build(), 198 MetricsConnection.newCallStats(), null); 199 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 200 TableName.valueOf(table), 201 MutateRequest.newBuilder() 202 .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) 203 .setRegion(region).build(), 204 MetricsConnection.newCallStats(), null); 205 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 206 TableName.valueOf(table), 207 MutateRequest.newBuilder() 208 .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) 209 .setRegion(region).build(), 210 MetricsConnection.newCallStats(), null); 211 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 212 TableName.valueOf(table), 213 MutateRequest.newBuilder() 214 .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region) 215 .build(), 216 MetricsConnection.newCallStats(), 217 new CallTimeoutException("test with CallTimeoutException")); 218 } 219 220 testRpcCallMetrics(table, loop); 221 222 String metricKey; 223 long metricVal; 224 Counter counter; 225 226 // remote exception 227 metricKey = "rpcRemoteExceptions_IOException"; 228 counter = METRICS.getRpcCounters().get(metricKey); 229 metricVal = (counter != null) ? counter.getCount() : 0; 230 assertEquals(metricVal, loop, "metric: " + metricKey + " val: " + metricVal); 231 232 // local exception 233 metricKey = "rpcLocalExceptions_CallTimeoutException"; 234 counter = METRICS.getRpcCounters().get(metricKey); 235 metricVal = (counter != null) ? counter.getCount() : 0; 236 assertEquals(metricVal, loop * 2, "metric: " + metricKey + " val: " + metricVal); 237 238 // total exception 239 metricKey = "rpcTotalExceptions"; 240 counter = METRICS.getRpcCounters().get(metricKey); 241 metricVal = (counter != null) ? counter.getCount() : 0; 242 assertEquals(metricVal, loop * 3, "metric: " + metricKey + " val: " + metricVal); 243 244 testRpcCallTableMetrics(table, loop); 245 246 for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { 247 METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(), 248 METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(), 249 METRICS.getPutTracker() }) { 250 assertEquals(loop, t.callTimer.getCount(), "Failed to invoke callTimer on " + t); 251 assertEquals(loop, t.reqHist.getCount(), "Failed to invoke reqHist on " + t); 252 assertEquals(loop, t.respHist.getCount(), "Failed to invoke respHist on " + t); 253 } 254 RatioGauge executorMetrics = 255 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName()); 256 RatioGauge metaMetrics = 257 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName()); 258 assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0); 259 assertEquals(Double.NaN, metaMetrics.getValue(), 0); 260 } 261 262 private void testRpcCallTableMetrics(String table, int expectedVal) { 263 String metricKey; 264 Timer timer; 265 String numOpsSuffix = "_num_ops"; 266 String p95Suffix = "_95th_percentile"; 267 String p99Suffix = "_99th_percentile"; 268 String service = ClientService.getDescriptor().getName(); 269 for (String m : new String[] { "Get", "Scan", "Multi" }) { 270 metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table; 271 timer = METRICS.getRpcTimers().get(metricKey); 272 if (tableMetricsEnabled) { 273 long numOps = timer.getCount(); 274 double p95 = timer.getSnapshot().get95thPercentile(); 275 double p99 = timer.getSnapshot().get99thPercentile(); 276 assertEquals(expectedVal, numOps, 277 "metric: " + metricKey + numOpsSuffix + " val: " + numOps); 278 assertTrue(p95 >= 0, "metric: " + metricKey + p95Suffix + " val: " + p95); 279 assertTrue(p99 >= 0, "metric: " + metricKey + p99Suffix + " val: " + p99); 280 } else { 281 assertNull(timer); 282 } 283 } 284 285 // Distinguish mutate types for mutate method. 286 String mutateMethod = "Mutate"; 287 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 288 metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")" 289 + "_" + table; 290 timer = METRICS.getRpcTimers().get(metricKey); 291 if (tableMetricsEnabled) { 292 long numOps = timer.getCount(); 293 double p95 = timer.getSnapshot().get95thPercentile(); 294 double p99 = timer.getSnapshot().get99thPercentile(); 295 assertEquals(expectedVal, numOps, 296 "metric: " + metricKey + numOpsSuffix + " val: " + numOps); 297 assertTrue(p95 >= 0, "metric: " + metricKey + p95Suffix + " val: " + p95); 298 assertTrue(p99 >= 0, "metric: " + metricKey + p99Suffix + " val: " + p99); 299 } else { 300 assertNull(timer); 301 } 302 } 303 } 304 305 private void testRpcCallMetrics(String table, int expectedVal) { 306 final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; 307 final String rpcFailureCountPrefix = 308 "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; 309 String metricKey; 310 long metricVal; 311 Counter counter; 312 313 for (String method : new String[] { "Get", "Scan", "Multi" }) { 314 // rpc call count 315 metricKey = rpcCountPrefix + method; 316 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 317 assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal); 318 319 // rpc failure call 320 metricKey = tableMetricsEnabled 321 ? rpcFailureCountPrefix + method + "_" + table 322 : rpcFailureCountPrefix + method; 323 counter = METRICS.getRpcCounters().get(metricKey); 324 metricVal = (counter != null) ? counter.getCount() : 0; 325 if (method.equals("Get")) { 326 // no failure 327 assertEquals(0, metricVal, "metric: " + metricKey + " val: " + metricVal); 328 } else { 329 // has failure 330 assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal); 331 } 332 } 333 334 String method = "Mutate"; 335 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 336 // rpc call count 337 metricKey = rpcCountPrefix + method + "(" + mutationType + ")"; 338 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 339 assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal); 340 341 // rpc failure call 342 metricKey = tableMetricsEnabled 343 ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table 344 : rpcFailureCountPrefix + method + "(" + mutationType + ")"; 345 counter = METRICS.getRpcCounters().get(metricKey); 346 metricVal = (counter != null) ? counter.getCount() : 0; 347 if (mutationType.equals("Put")) { 348 // has failure 349 assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal); 350 } else { 351 // no failure 352 assertEquals(0, metricVal, "metric: " + metricKey + " val: " + metricVal); 353 } 354 } 355 } 356}