001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.ConcurrentSkipListMap; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.function.Supplier; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 041import org.apache.hbase.thirdparty.com.google.protobuf.Message; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 046 047/** 048 * This class is for maintaining the various connection statistics and publishing them through the 049 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so 050 * as to not conflict with other uses of Yammer Metrics within the client application. Instantiating 051 * this class implicitly creates and "starts" instances of these classes; be sure to call 052 * {@link #shutdown()} to terminate the thread pools they allocate. 053 */ 054@InterfaceAudience.Private 055public class MetricsConnection implements StatisticTrackable { 056 057 /** Set this key to {@code true} to enable metrics collection of client requests. */ 058 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 059 060 /** 061 * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The 062 * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's 063 * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be 064 * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users 065 * may set this key to give a more contextual name for this scope. For example, one might want to 066 * differentiate a read connection from a write connection by setting the scopes to "foo-read" and 067 * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics. 068 * Care should be taken to avoid using the same scope for multiple Connections, otherwise the 069 * metrics may aggregate in unforeseen ways. 070 */ 071 public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; 072 073 /** 074 * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or 075 * by generating a default from the passed clusterId and connectionObj's hashCode. 076 * @param conf configuration for the connection 077 * @param clusterId clusterId for the connection 078 * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this 079 * MetricsConnection. 080 */ 081 static String getScope(Configuration conf, String clusterId, Object connectionObj) { 082 return conf.get(METRICS_SCOPE_KEY, 083 clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); 084 } 085 086 private static final String CNT_BASE = "rpcCount_"; 087 private static final String DRTN_BASE = "rpcCallDurationMs_"; 088 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 089 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 090 private static final String MEMLOAD_BASE = "memstoreLoad_"; 091 private static final String HEAP_BASE = "heapOccupancy_"; 092 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 093 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 094 private static final String NS_LOOKUPS = "nsLookups"; 095 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; 096 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 097 098 /** A container class for collecting details about the RPC call as it percolates. */ 099 public static class CallStats { 100 private long requestSizeBytes = 0; 101 private long responseSizeBytes = 0; 102 private long startTime = 0; 103 private long callTimeMs = 0; 104 private int concurrentCallsPerServer = 0; 105 private int numActionsPerServer = 0; 106 107 public long getRequestSizeBytes() { 108 return requestSizeBytes; 109 } 110 111 public void setRequestSizeBytes(long requestSizeBytes) { 112 this.requestSizeBytes = requestSizeBytes; 113 } 114 115 public long getResponseSizeBytes() { 116 return responseSizeBytes; 117 } 118 119 public void setResponseSizeBytes(long responseSizeBytes) { 120 this.responseSizeBytes = responseSizeBytes; 121 } 122 123 public long getStartTime() { 124 return startTime; 125 } 126 127 public void setStartTime(long startTime) { 128 this.startTime = startTime; 129 } 130 131 public long getCallTimeMs() { 132 return callTimeMs; 133 } 134 135 public void setCallTimeMs(long callTimeMs) { 136 this.callTimeMs = callTimeMs; 137 } 138 139 public int getConcurrentCallsPerServer() { 140 return concurrentCallsPerServer; 141 } 142 143 public void setConcurrentCallsPerServer(int callsPerServer) { 144 this.concurrentCallsPerServer = callsPerServer; 145 } 146 147 public int getNumActionsPerServer() { 148 return numActionsPerServer; 149 } 150 151 public void setNumActionsPerServer(int numActionsPerServer) { 152 this.numActionsPerServer = numActionsPerServer; 153 } 154 } 155 156 protected static final class CallTracker { 157 private final String name; 158 final Timer callTimer; 159 final Histogram reqHist; 160 final Histogram respHist; 161 162 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 163 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 164 if (subName != null) { 165 sb.append("(").append(subName).append(")"); 166 } 167 this.name = sb.toString(); 168 this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope)); 169 this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope)); 170 this.respHist = 171 registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope)); 172 } 173 174 private CallTracker(MetricRegistry registry, String name, String scope) { 175 this(registry, name, null, scope); 176 } 177 178 public void updateRpc(CallStats stats) { 179 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 180 this.reqHist.update(stats.getRequestSizeBytes()); 181 this.respHist.update(stats.getResponseSizeBytes()); 182 } 183 184 @Override 185 public String toString() { 186 return "CallTracker:" + name; 187 } 188 } 189 190 protected static class RegionStats { 191 final String name; 192 final Histogram memstoreLoadHist; 193 final Histogram heapOccupancyHist; 194 195 public RegionStats(MetricRegistry registry, String name) { 196 this.name = name; 197 this.memstoreLoadHist = 198 registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name)); 199 this.heapOccupancyHist = 200 registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name)); 201 } 202 203 public void update(RegionLoadStats regionStatistics) { 204 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 205 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 206 } 207 } 208 209 protected static class RunnerStats { 210 final Counter normalRunners; 211 final Counter delayRunners; 212 final Histogram delayIntevalHist; 213 214 public RunnerStats(MetricRegistry registry) { 215 this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount")); 216 this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount")); 217 this.delayIntevalHist = 218 registry.histogram(name(MetricsConnection.class, "delayIntervalHist")); 219 } 220 221 public void incrNormalRunners() { 222 this.normalRunners.inc(); 223 } 224 225 public void incrDelayRunners() { 226 this.delayRunners.inc(); 227 } 228 229 public void updateDelayInterval(long interval) { 230 this.delayIntevalHist.update(interval); 231 } 232 } 233 234 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats = 235 new ConcurrentHashMap<>(); 236 237 public void updateServerStats(ServerName serverName, byte[] regionName, Object r) { 238 if (!(r instanceof Result)) { 239 return; 240 } 241 Result result = (Result) r; 242 RegionLoadStats stats = result.getStats(); 243 if (stats == null) { 244 return; 245 } 246 updateRegionStats(serverName, regionName, stats); 247 } 248 249 @Override 250 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 251 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 252 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 253 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 254 RegionStats regionStats = 255 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 256 regionStats.update(stats); 257 } 258 259 /** A lambda for dispatching to the appropriate metric factory method */ 260 private static interface NewMetric<T> { 261 T newMetric(Class<?> clazz, String name, String scope); 262 } 263 264 /** Anticipated number of metric entries */ 265 private static final int CAPACITY = 50; 266 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 267 private static final float LOAD_FACTOR = 0.75f; 268 /** 269 * Anticipated number of concurrent accessor threads 270 */ 271 private static final int CONCURRENCY_LEVEL = 256; 272 273 private final MetricRegistry registry; 274 private final JmxReporter reporter; 275 protected final String scope; 276 277 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 278 @Override 279 public Timer newMetric(Class<?> clazz, String name, String scope) { 280 return registry.timer(name(clazz, name, scope)); 281 } 282 }; 283 284 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 285 @Override 286 public Histogram newMetric(Class<?> clazz, String name, String scope) { 287 return registry.histogram(name(clazz, name, scope)); 288 } 289 }; 290 291 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 292 @Override 293 public Counter newMetric(Class<?> clazz, String name, String scope) { 294 return registry.counter(name(clazz, name, scope)); 295 } 296 }; 297 298 // static metrics 299 300 protected final Counter metaCacheHits; 301 protected final Counter metaCacheMisses; 302 protected final CallTracker getTracker; 303 protected final CallTracker scanTracker; 304 protected final CallTracker appendTracker; 305 protected final CallTracker deleteTracker; 306 protected final CallTracker incrementTracker; 307 protected final CallTracker putTracker; 308 protected final CallTracker multiTracker; 309 protected final RunnerStats runnerStats; 310 protected final Counter metaCacheNumClearServer; 311 protected final Counter metaCacheNumClearRegion; 312 protected final Counter hedgedReadOps; 313 protected final Counter hedgedReadWin; 314 protected final Histogram concurrentCallsPerServerHist; 315 protected final Histogram numActionsPerServerHist; 316 protected final Counter nsLookups; 317 protected final Counter nsLookupsFailed; 318 319 // dynamic metrics 320 321 // These maps are used to cache references to the metric instances that are managed by the 322 // registry. I don't think their use perfectly removes redundant allocations, but it's 323 // a big improvement over calling registry.newMetric each time. 324 protected final ConcurrentMap<String, Timer> rpcTimers = 325 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 326 protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>( 327 CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL); 328 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 329 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 330 protected final ConcurrentMap<String, Counter> rpcCounters = 331 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 332 333 MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 334 Supplier<ThreadPoolExecutor> metaPool) { 335 this.scope = scope; 336 this.registry = new MetricRegistry(); 337 this.registry.register(getExecutorPoolName(), new RatioGauge() { 338 @Override 339 protected Ratio getRatio() { 340 ThreadPoolExecutor pool = batchPool.get(); 341 if (pool == null) { 342 return Ratio.of(0, 0); 343 } 344 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 345 } 346 }); 347 this.registry.register(getMetaPoolName(), new RatioGauge() { 348 @Override 349 protected Ratio getRatio() { 350 ThreadPoolExecutor pool = metaPool.get(); 351 if (pool == null) { 352 return Ratio.of(0, 0); 353 } 354 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 355 } 356 }); 357 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 358 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 359 this.metaCacheNumClearServer = 360 registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope)); 361 this.metaCacheNumClearRegion = 362 registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); 363 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 364 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 365 this.getTracker = new CallTracker(this.registry, "Get", scope); 366 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 367 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 368 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 369 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 370 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 371 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 372 this.runnerStats = new RunnerStats(this.registry); 373 this.concurrentCallsPerServerHist = 374 registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); 375 this.numActionsPerServerHist = 376 registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); 377 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); 378 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); 379 380 this.reporter = JmxReporter.forRegistry(this.registry).build(); 381 this.reporter.start(); 382 } 383 384 final String getExecutorPoolName() { 385 return name(getClass(), "executorPoolActiveThreads", scope); 386 } 387 388 final String getMetaPoolName() { 389 return name(getClass(), "metaPoolActiveThreads", scope); 390 } 391 392 MetricRegistry getMetricRegistry() { 393 return registry; 394 } 395 396 public void shutdown() { 397 this.reporter.stop(); 398 } 399 400 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 401 public static CallStats newCallStats() { 402 // TODO: instance pool to reduce GC? 403 return new CallStats(); 404 } 405 406 /** Increment the number of meta cache hits. */ 407 public void incrMetaCacheHit() { 408 metaCacheHits.inc(); 409 } 410 411 /** Increment the number of meta cache misses. */ 412 public void incrMetaCacheMiss() { 413 metaCacheMisses.inc(); 414 } 415 416 /** Increment the number of meta cache drops requested for entire RegionServer. */ 417 public void incrMetaCacheNumClearServer() { 418 metaCacheNumClearServer.inc(); 419 } 420 421 /** Increment the number of meta cache drops requested for individual region. */ 422 public void incrMetaCacheNumClearRegion() { 423 metaCacheNumClearRegion.inc(); 424 } 425 426 /** Increment the number of meta cache drops requested for individual region. */ 427 public void incrMetaCacheNumClearRegion(int count) { 428 metaCacheNumClearRegion.inc(count); 429 } 430 431 /** Increment the number of hedged read that have occurred. */ 432 public void incrHedgedReadOps() { 433 hedgedReadOps.inc(); 434 } 435 436 /** Increment the number of hedged read returned faster than the original read. */ 437 public void incrHedgedReadWin() { 438 hedgedReadWin.inc(); 439 } 440 441 /** Increment the number of normal runner counts. */ 442 public void incrNormalRunners() { 443 this.runnerStats.incrNormalRunners(); 444 } 445 446 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 447 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 448 this.runnerStats.incrDelayRunners(); 449 this.runnerStats.updateDelayInterval(interval); 450 } 451 452 /** 453 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 454 */ 455 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 456 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 457 } 458 459 /** Update call stats for non-critical-path methods */ 460 private void updateRpcGeneric(String methodName, CallStats stats) { 461 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(), 462 TimeUnit.MILLISECONDS); 463 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 464 .update(stats.getRequestSizeBytes()); 465 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 466 .update(stats.getResponseSizeBytes()); 467 } 468 469 /** Report RPC context to metrics system. */ 470 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 471 int callsPerServer = stats.getConcurrentCallsPerServer(); 472 if (callsPerServer > 0) { 473 concurrentCallsPerServerHist.update(callsPerServer); 474 } 475 // Update the counter that tracks RPCs by type. 476 final String methodName = method.getService().getName() + "_" + method.getName(); 477 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 478 // this implementation is tied directly to protobuf implementation details. would be better 479 // if we could dispatch based on something static, ie, request Message type. 480 if (method.getService() == ClientService.getDescriptor()) { 481 switch (method.getIndex()) { 482 case 0: 483 assert "Get".equals(method.getName()); 484 getTracker.updateRpc(stats); 485 return; 486 case 1: 487 assert "Mutate".equals(method.getName()); 488 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 489 switch (mutationType) { 490 case APPEND: 491 appendTracker.updateRpc(stats); 492 return; 493 case DELETE: 494 deleteTracker.updateRpc(stats); 495 return; 496 case INCREMENT: 497 incrementTracker.updateRpc(stats); 498 return; 499 case PUT: 500 putTracker.updateRpc(stats); 501 return; 502 default: 503 throw new RuntimeException("Unrecognized mutation type " + mutationType); 504 } 505 case 2: 506 assert "Scan".equals(method.getName()); 507 scanTracker.updateRpc(stats); 508 return; 509 case 3: 510 assert "BulkLoadHFile".equals(method.getName()); 511 // use generic implementation 512 break; 513 case 4: 514 assert "PrepareBulkLoad".equals(method.getName()); 515 // use generic implementation 516 break; 517 case 5: 518 assert "CleanupBulkLoad".equals(method.getName()); 519 // use generic implementation 520 break; 521 case 6: 522 assert "ExecService".equals(method.getName()); 523 // use generic implementation 524 break; 525 case 7: 526 assert "ExecRegionServerService".equals(method.getName()); 527 // use generic implementation 528 break; 529 case 8: 530 assert "Multi".equals(method.getName()); 531 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 532 multiTracker.updateRpc(stats); 533 return; 534 default: 535 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 536 } 537 } 538 // Fallback to dynamic registry lookup for DDL methods. 539 updateRpcGeneric(methodName, stats); 540 } 541 542 public void incrCacheDroppingExceptions(Object exception) { 543 getMetric( 544 CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 545 cacheDroppingExceptions, counterFactory).inc(); 546 } 547 548 public void incrNsLookups() { 549 this.nsLookups.inc(); 550 } 551 552 public void incrNsLookupsFailed() { 553 this.nsLookupsFailed.inc(); 554 } 555}