001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042 043import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 044import org.apache.hbase.thirdparty.com.google.protobuf.Message; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 049 050/** 051 * This class is for maintaining the various connection statistics and publishing them through the 052 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so 053 * as to not conflict with other uses of Yammer Metrics within the client application. Calling 054 * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts" 055 * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate 056 * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when 057 * all connections within this metrics instances are closed. 058 */ 059@InterfaceAudience.Private 060public final class MetricsConnection implements StatisticTrackable { 061 062 private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES = 063 new ConcurrentHashMap<>(); 064 065 static MetricsConnection getMetricsConnection(final String scope, 066 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 067 return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> { 068 if (metricsConnection == null) { 069 MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool); 070 newMetricsConn.incrConnectionCount(); 071 return newMetricsConn; 072 } else { 073 metricsConnection.addThreadPools(batchPool, metaPool); 074 metricsConnection.incrConnectionCount(); 075 return metricsConnection; 076 } 077 }); 078 } 079 080 static void deleteMetricsConnection(final String scope) { 081 METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> { 082 metricsConnection.decrConnectionCount(); 083 if (metricsConnection.getConnectionCount() == 0) { 084 metricsConnection.shutdown(); 085 return null; 086 } 087 return metricsConnection; 088 }); 089 } 090 091 /** Set this key to {@code true} to enable metrics collection of client requests. */ 092 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 093 094 /** 095 * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The 096 * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's 097 * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be 098 * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users 099 * may set this key to give a more contextual name for this scope. For example, one might want to 100 * differentiate a read connection from a write connection by setting the scopes to "foo-read" and 101 * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics. 102 * Care should be taken to avoid using the same scope for multiple Connections, otherwise the 103 * metrics may aggregate in unforeseen ways. 104 */ 105 public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; 106 107 /** 108 * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or 109 * by generating a default from the passed clusterId and connectionObj's hashCode. 110 * @param conf configuration for the connection 111 * @param clusterId clusterId for the connection 112 * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this 113 * MetricsConnection. 114 */ 115 static String getScope(Configuration conf, String clusterId, Object connectionObj) { 116 return conf.get(METRICS_SCOPE_KEY, 117 clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); 118 } 119 120 private static final String CNT_BASE = "rpcCount_"; 121 private static final String FAILURE_CNT_BASE = "rpcFailureCount_"; 122 private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions"; 123 private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_"; 124 private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_"; 125 private static final String DRTN_BASE = "rpcCallDurationMs_"; 126 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 127 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 128 private static final String MEMLOAD_BASE = "memstoreLoad_"; 129 private static final String HEAP_BASE = "heapOccupancy_"; 130 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 131 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 132 private static final String NS_LOOKUPS = "nsLookups"; 133 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; 134 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 135 136 /** A container class for collecting details about the RPC call as it percolates. */ 137 public static class CallStats { 138 private long requestSizeBytes = 0; 139 private long responseSizeBytes = 0; 140 private long startTime = 0; 141 private long callTimeMs = 0; 142 private int concurrentCallsPerServer = 0; 143 private int numActionsPerServer = 0; 144 145 public long getRequestSizeBytes() { 146 return requestSizeBytes; 147 } 148 149 public void setRequestSizeBytes(long requestSizeBytes) { 150 this.requestSizeBytes = requestSizeBytes; 151 } 152 153 public long getResponseSizeBytes() { 154 return responseSizeBytes; 155 } 156 157 public void setResponseSizeBytes(long responseSizeBytes) { 158 this.responseSizeBytes = responseSizeBytes; 159 } 160 161 public long getStartTime() { 162 return startTime; 163 } 164 165 public void setStartTime(long startTime) { 166 this.startTime = startTime; 167 } 168 169 public long getCallTimeMs() { 170 return callTimeMs; 171 } 172 173 public void setCallTimeMs(long callTimeMs) { 174 this.callTimeMs = callTimeMs; 175 } 176 177 public int getConcurrentCallsPerServer() { 178 return concurrentCallsPerServer; 179 } 180 181 public void setConcurrentCallsPerServer(int callsPerServer) { 182 this.concurrentCallsPerServer = callsPerServer; 183 } 184 185 public int getNumActionsPerServer() { 186 return numActionsPerServer; 187 } 188 189 public void setNumActionsPerServer(int numActionsPerServer) { 190 this.numActionsPerServer = numActionsPerServer; 191 } 192 } 193 194 protected static final class CallTracker { 195 private final String name; 196 final Timer callTimer; 197 final Histogram reqHist; 198 final Histogram respHist; 199 200 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 201 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 202 if (subName != null) { 203 sb.append("(").append(subName).append(")"); 204 } 205 this.name = sb.toString(); 206 this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope)); 207 this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope)); 208 this.respHist = 209 registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope)); 210 } 211 212 private CallTracker(MetricRegistry registry, String name, String scope) { 213 this(registry, name, null, scope); 214 } 215 216 public void updateRpc(CallStats stats) { 217 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 218 this.reqHist.update(stats.getRequestSizeBytes()); 219 this.respHist.update(stats.getResponseSizeBytes()); 220 } 221 222 @Override 223 public String toString() { 224 return "CallTracker:" + name; 225 } 226 } 227 228 protected static class RegionStats { 229 final String name; 230 final Histogram memstoreLoadHist; 231 final Histogram heapOccupancyHist; 232 233 public RegionStats(MetricRegistry registry, String name) { 234 this.name = name; 235 this.memstoreLoadHist = 236 registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name)); 237 this.heapOccupancyHist = 238 registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name)); 239 } 240 241 public void update(RegionLoadStats regionStatistics) { 242 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 243 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 244 } 245 } 246 247 protected static class RunnerStats { 248 final Counter normalRunners; 249 final Counter delayRunners; 250 final Histogram delayIntevalHist; 251 252 public RunnerStats(MetricRegistry registry) { 253 this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount")); 254 this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount")); 255 this.delayIntevalHist = 256 registry.histogram(name(MetricsConnection.class, "delayIntervalHist")); 257 } 258 259 public void incrNormalRunners() { 260 this.normalRunners.inc(); 261 } 262 263 public void incrDelayRunners() { 264 this.delayRunners.inc(); 265 } 266 267 public void updateDelayInterval(long interval) { 268 this.delayIntevalHist.update(interval); 269 } 270 } 271 272 private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats = 273 new ConcurrentHashMap<>(); 274 275 public void updateServerStats(ServerName serverName, byte[] regionName, Object r) { 276 if (!(r instanceof Result)) { 277 return; 278 } 279 Result result = (Result) r; 280 RegionLoadStats stats = result.getStats(); 281 if (stats == null) { 282 return; 283 } 284 updateRegionStats(serverName, regionName, stats); 285 } 286 287 @Override 288 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 289 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 290 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 291 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 292 RegionStats regionStats = 293 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 294 regionStats.update(stats); 295 } 296 297 /** A lambda for dispatching to the appropriate metric factory method */ 298 private static interface NewMetric<T> { 299 T newMetric(Class<?> clazz, String name, String scope); 300 } 301 302 /** Anticipated number of metric entries */ 303 private static final int CAPACITY = 50; 304 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 305 private static final float LOAD_FACTOR = 0.75f; 306 /** 307 * Anticipated number of concurrent accessor threads 308 */ 309 private static final int CONCURRENCY_LEVEL = 256; 310 311 private final MetricRegistry registry; 312 private final JmxReporter reporter; 313 private final String scope; 314 315 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 316 @Override 317 public Timer newMetric(Class<?> clazz, String name, String scope) { 318 return registry.timer(name(clazz, name, scope)); 319 } 320 }; 321 322 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 323 @Override 324 public Histogram newMetric(Class<?> clazz, String name, String scope) { 325 return registry.histogram(name(clazz, name, scope)); 326 } 327 }; 328 329 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 330 @Override 331 public Counter newMetric(Class<?> clazz, String name, String scope) { 332 return registry.counter(name(clazz, name, scope)); 333 } 334 }; 335 336 // List of thread pool per connection of the metrics. 337 private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>(); 338 private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>(); 339 340 // static metrics 341 342 private final Counter connectionCount; 343 private final Counter metaCacheHits; 344 private final Counter metaCacheMisses; 345 private final CallTracker getTracker; 346 private final CallTracker scanTracker; 347 private final CallTracker appendTracker; 348 private final CallTracker deleteTracker; 349 private final CallTracker incrementTracker; 350 private final CallTracker putTracker; 351 private final CallTracker multiTracker; 352 private final RunnerStats runnerStats; 353 private final Counter metaCacheNumClearServer; 354 private final Counter metaCacheNumClearRegion; 355 private final Counter hedgedReadOps; 356 private final Counter hedgedReadWin; 357 private final Histogram concurrentCallsPerServerHist; 358 private final Histogram numActionsPerServerHist; 359 private final Counter nsLookups; 360 private final Counter nsLookupsFailed; 361 private final Timer overloadedBackoffTimer; 362 363 // dynamic metrics 364 365 // These maps are used to cache references to the metric instances that are managed by the 366 // registry. I don't think their use perfectly removes redundant allocations, but it's 367 // a big improvement over calling registry.newMetric each time. 368 private final ConcurrentMap<String, Timer> rpcTimers = 369 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 370 private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>( 371 CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL); 372 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 373 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 374 private final ConcurrentMap<String, Counter> rpcCounters = 375 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 376 377 private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 378 Supplier<ThreadPoolExecutor> metaPool) { 379 this.scope = scope; 380 addThreadPools(batchPool, metaPool); 381 this.registry = new MetricRegistry(); 382 this.registry.register(getExecutorPoolName(), new RatioGauge() { 383 @Override 384 protected Ratio getRatio() { 385 int numerator = 0; 386 int denominator = 0; 387 for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) { 388 ThreadPoolExecutor pool = poolSupplier.get(); 389 if (pool != null) { 390 int activeCount = pool.getActiveCount(); 391 int maxPoolSize = pool.getMaximumPoolSize(); 392 /* The max thread usage ratio among batch pools of all connections */ 393 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 394 numerator = activeCount; 395 denominator = maxPoolSize; 396 } 397 } 398 } 399 return Ratio.of(numerator, denominator); 400 } 401 }); 402 this.registry.register(getMetaPoolName(), new RatioGauge() { 403 @Override 404 protected Ratio getRatio() { 405 int numerator = 0; 406 int denominator = 0; 407 for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) { 408 ThreadPoolExecutor pool = poolSupplier.get(); 409 if (pool != null) { 410 int activeCount = pool.getActiveCount(); 411 int maxPoolSize = pool.getMaximumPoolSize(); 412 /* The max thread usage ratio among meta lookup pools of all connections */ 413 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 414 numerator = activeCount; 415 denominator = maxPoolSize; 416 } 417 } 418 } 419 return Ratio.of(numerator, denominator); 420 } 421 }); 422 this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope)); 423 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 424 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 425 this.metaCacheNumClearServer = 426 registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope)); 427 this.metaCacheNumClearRegion = 428 registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); 429 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 430 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 431 this.getTracker = new CallTracker(this.registry, "Get", scope); 432 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 433 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 434 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 435 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 436 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 437 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 438 this.runnerStats = new RunnerStats(this.registry); 439 this.concurrentCallsPerServerHist = 440 registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); 441 this.numActionsPerServerHist = 442 registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); 443 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); 444 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); 445 this.overloadedBackoffTimer = 446 registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); 447 448 this.reporter = JmxReporter.forRegistry(this.registry).build(); 449 this.reporter.start(); 450 } 451 452 final String getExecutorPoolName() { 453 return name(getClass(), "executorPoolActiveThreads", scope); 454 } 455 456 final String getMetaPoolName() { 457 return name(getClass(), "metaPoolActiveThreads", scope); 458 } 459 460 MetricRegistry getMetricRegistry() { 461 return registry; 462 } 463 464 /** scope of the metrics object */ 465 public String getMetricScope() { 466 return scope; 467 } 468 469 /** serverStats metric */ 470 public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() { 471 return serverStats; 472 } 473 474 /** runnerStats metric */ 475 public RunnerStats getRunnerStats() { 476 return runnerStats; 477 } 478 479 /** metaCacheNumClearServer metric */ 480 public Counter getMetaCacheNumClearServer() { 481 return metaCacheNumClearServer; 482 } 483 484 /** metaCacheNumClearRegion metric */ 485 public Counter getMetaCacheNumClearRegion() { 486 return metaCacheNumClearRegion; 487 } 488 489 /** hedgedReadOps metric */ 490 public Counter getHedgedReadOps() { 491 return hedgedReadOps; 492 } 493 494 /** hedgedReadWin metric */ 495 public Counter getHedgedReadWin() { 496 return hedgedReadWin; 497 } 498 499 /** numActionsPerServerHist metric */ 500 public Histogram getNumActionsPerServerHist() { 501 return numActionsPerServerHist; 502 } 503 504 /** rpcCounters metric */ 505 public ConcurrentMap<String, Counter> getRpcCounters() { 506 return rpcCounters; 507 } 508 509 /** getTracker metric */ 510 public CallTracker getGetTracker() { 511 return getTracker; 512 } 513 514 /** scanTracker metric */ 515 public CallTracker getScanTracker() { 516 return scanTracker; 517 } 518 519 /** multiTracker metric */ 520 public CallTracker getMultiTracker() { 521 return multiTracker; 522 } 523 524 /** appendTracker metric */ 525 public CallTracker getAppendTracker() { 526 return appendTracker; 527 } 528 529 /** deleteTracker metric */ 530 public CallTracker getDeleteTracker() { 531 return deleteTracker; 532 } 533 534 /** incrementTracker metric */ 535 public CallTracker getIncrementTracker() { 536 return incrementTracker; 537 } 538 539 /** putTracker metric */ 540 public CallTracker getPutTracker() { 541 return putTracker; 542 } 543 544 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 545 public static CallStats newCallStats() { 546 // TODO: instance pool to reduce GC? 547 return new CallStats(); 548 } 549 550 /** Increment the number of meta cache hits. */ 551 public void incrMetaCacheHit() { 552 metaCacheHits.inc(); 553 } 554 555 /** Increment the number of meta cache misses. */ 556 public void incrMetaCacheMiss() { 557 metaCacheMisses.inc(); 558 } 559 560 public long getMetaCacheMisses() { 561 return metaCacheMisses.getCount(); 562 } 563 564 /** Increment the number of meta cache drops requested for entire RegionServer. */ 565 public void incrMetaCacheNumClearServer() { 566 metaCacheNumClearServer.inc(); 567 } 568 569 /** Increment the number of meta cache drops requested for individual region. */ 570 public void incrMetaCacheNumClearRegion() { 571 metaCacheNumClearRegion.inc(); 572 } 573 574 /** Increment the number of meta cache drops requested for individual region. */ 575 public void incrMetaCacheNumClearRegion(int count) { 576 metaCacheNumClearRegion.inc(count); 577 } 578 579 /** Increment the number of hedged read that have occurred. */ 580 public void incrHedgedReadOps() { 581 hedgedReadOps.inc(); 582 } 583 584 /** Increment the number of hedged read returned faster than the original read. */ 585 public void incrHedgedReadWin() { 586 hedgedReadWin.inc(); 587 } 588 589 /** Increment the number of normal runner counts. */ 590 public void incrNormalRunners() { 591 this.runnerStats.incrNormalRunners(); 592 } 593 594 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 595 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 596 this.runnerStats.incrDelayRunners(); 597 this.runnerStats.updateDelayInterval(interval); 598 } 599 600 /** Update the overloaded backoff time **/ 601 public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { 602 overloadedBackoffTimer.update(time, timeUnit); 603 } 604 605 /** Return the connection count of the metrics within a scope */ 606 public long getConnectionCount() { 607 return connectionCount.getCount(); 608 } 609 610 /** Increment the connection count of the metrics within a scope */ 611 private void incrConnectionCount() { 612 connectionCount.inc(); 613 } 614 615 /** Decrement the connection count of the metrics within a scope */ 616 private void decrConnectionCount() { 617 connectionCount.dec(); 618 } 619 620 /** Add thread pools of additional connections to the metrics */ 621 private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool, 622 Supplier<ThreadPoolExecutor> metaPool) { 623 batchPools.add(batchPool); 624 metaPools.add(metaPool); 625 } 626 627 /** 628 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 629 */ 630 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 631 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 632 } 633 634 /** Update call stats for non-critical-path methods */ 635 private void updateRpcGeneric(String methodName, CallStats stats) { 636 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(), 637 TimeUnit.MILLISECONDS); 638 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 639 .update(stats.getRequestSizeBytes()); 640 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 641 .update(stats.getResponseSizeBytes()); 642 } 643 644 private void shutdown() { 645 this.reporter.stop(); 646 } 647 648 /** Report RPC context to metrics system. */ 649 public void updateRpc(MethodDescriptor method, Message param, CallStats stats, Throwable e) { 650 int callsPerServer = stats.getConcurrentCallsPerServer(); 651 if (callsPerServer > 0) { 652 concurrentCallsPerServerHist.update(callsPerServer); 653 } 654 // Update the counter that tracks RPCs by type. 655 final String methodName = method.getService().getName() + "_" + method.getName(); 656 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 657 if (e != null) { 658 getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 659 getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc(); 660 if (e instanceof RemoteException) { 661 String fullClassName = ((RemoteException) e).getClassName(); 662 String simpleClassName = (fullClassName != null) 663 ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1) 664 : "unknown"; 665 getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc(); 666 } else { 667 getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters, 668 counterFactory).inc(); 669 } 670 } 671 // this implementation is tied directly to protobuf implementation details. would be better 672 // if we could dispatch based on something static, ie, request Message type. 673 if (method.getService() == ClientService.getDescriptor()) { 674 switch (method.getIndex()) { 675 case 0: 676 assert "Get".equals(method.getName()); 677 getTracker.updateRpc(stats); 678 return; 679 case 1: 680 assert "Mutate".equals(method.getName()); 681 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 682 switch (mutationType) { 683 case APPEND: 684 appendTracker.updateRpc(stats); 685 return; 686 case DELETE: 687 deleteTracker.updateRpc(stats); 688 return; 689 case INCREMENT: 690 incrementTracker.updateRpc(stats); 691 return; 692 case PUT: 693 putTracker.updateRpc(stats); 694 return; 695 default: 696 throw new RuntimeException("Unrecognized mutation type " + mutationType); 697 } 698 case 2: 699 assert "Scan".equals(method.getName()); 700 scanTracker.updateRpc(stats); 701 return; 702 case 3: 703 assert "BulkLoadHFile".equals(method.getName()); 704 // use generic implementation 705 break; 706 case 4: 707 assert "PrepareBulkLoad".equals(method.getName()); 708 // use generic implementation 709 break; 710 case 5: 711 assert "CleanupBulkLoad".equals(method.getName()); 712 // use generic implementation 713 break; 714 case 6: 715 assert "ExecService".equals(method.getName()); 716 // use generic implementation 717 break; 718 case 7: 719 assert "ExecRegionServerService".equals(method.getName()); 720 // use generic implementation 721 break; 722 case 8: 723 assert "Multi".equals(method.getName()); 724 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 725 multiTracker.updateRpc(stats); 726 return; 727 default: 728 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 729 } 730 } 731 // Fallback to dynamic registry lookup for DDL methods. 732 updateRpcGeneric(methodName, stats); 733 } 734 735 public void incrCacheDroppingExceptions(Object exception) { 736 getMetric( 737 CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 738 cacheDroppingExceptions, counterFactory).inc(); 739 } 740 741 public void incrNsLookups() { 742 this.nsLookups.inc(); 743 } 744 745 public void incrNsLookupsFailed() { 746 this.nsLookupsFailed.inc(); 747 } 748}