001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.commons.lang3.StringUtils; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.yetus.audience.InterfaceAudience; 044 045import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 046import org.apache.hbase.thirdparty.com.google.protobuf.Message; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 051 052/** 053 * This class is for maintaining the various connection statistics and publishing them through the 054 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so 055 * as to not conflict with other uses of Yammer Metrics within the client application. Calling 056 * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and 057 * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to 058 * terminate the thread pools they allocate. The metrics reporter will be shutdown 059 * {@link #shutdown()} when all connections within this metrics instances are closed. 060 */ 061@InterfaceAudience.Private 062public final class MetricsConnection implements StatisticTrackable { 063 064 private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES = 065 new ConcurrentHashMap<>(); 066 067 static MetricsConnection getMetricsConnection(final Configuration conf, final String scope, 068 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 069 return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> { 070 if (metricsConnection == null) { 071 MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool); 072 newMetricsConn.incrConnectionCount(); 073 return newMetricsConn; 074 } else { 075 metricsConnection.addThreadPools(batchPool, metaPool); 076 metricsConnection.incrConnectionCount(); 077 return metricsConnection; 078 } 079 }); 080 } 081 082 static void deleteMetricsConnection(final String scope) { 083 METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> { 084 metricsConnection.decrConnectionCount(); 085 if (metricsConnection.getConnectionCount() == 0) { 086 metricsConnection.shutdown(); 087 return null; 088 } 089 return metricsConnection; 090 }); 091 } 092 093 /** Set this key to {@code true} to enable metrics collection of client requests. */ 094 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 095 096 /** Set this key to {@code true} to enable table metrics collection of client requests. */ 097 public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY = 098 "hbase.client.table.metrics.enable"; 099 100 /** 101 * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The 102 * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's 103 * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be 104 * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users 105 * may set this key to give a more contextual name for this scope. For example, one might want to 106 * differentiate a read connection from a write connection by setting the scopes to "foo-read" and 107 * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics. 108 * Care should be taken to avoid using the same scope for multiple Connections, otherwise the 109 * metrics may aggregate in unforeseen ways. 110 */ 111 public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; 112 113 /** 114 * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or 115 * by generating a default from the passed clusterId and connectionObj's hashCode. 116 * @param conf configuration for the connection 117 * @param clusterId clusterId for the connection 118 * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this 119 * MetricsConnection. 120 */ 121 static String getScope(Configuration conf, String clusterId, Object connectionObj) { 122 return conf.get(METRICS_SCOPE_KEY, 123 clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); 124 } 125 126 private static final String CNT_BASE = "rpcCount_"; 127 private static final String FAILURE_CNT_BASE = "rpcFailureCount_"; 128 private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions"; 129 private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_"; 130 private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_"; 131 private static final String DRTN_BASE = "rpcCallDurationMs_"; 132 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 133 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 134 private static final String MEMLOAD_BASE = "memstoreLoad_"; 135 private static final String HEAP_BASE = "heapOccupancy_"; 136 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 137 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 138 private static final String NS_LOOKUPS = "nsLookups"; 139 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; 140 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 141 142 /** A container class for collecting details about the RPC call as it percolates. */ 143 public static class CallStats { 144 private long requestSizeBytes = 0; 145 private long responseSizeBytes = 0; 146 private long startTime = 0; 147 private long callTimeMs = 0; 148 private int concurrentCallsPerServer = 0; 149 private int numActionsPerServer = 0; 150 151 public long getRequestSizeBytes() { 152 return requestSizeBytes; 153 } 154 155 public void setRequestSizeBytes(long requestSizeBytes) { 156 this.requestSizeBytes = requestSizeBytes; 157 } 158 159 public long getResponseSizeBytes() { 160 return responseSizeBytes; 161 } 162 163 public void setResponseSizeBytes(long responseSizeBytes) { 164 this.responseSizeBytes = responseSizeBytes; 165 } 166 167 public long getStartTime() { 168 return startTime; 169 } 170 171 public void setStartTime(long startTime) { 172 this.startTime = startTime; 173 } 174 175 public long getCallTimeMs() { 176 return callTimeMs; 177 } 178 179 public void setCallTimeMs(long callTimeMs) { 180 this.callTimeMs = callTimeMs; 181 } 182 183 public int getConcurrentCallsPerServer() { 184 return concurrentCallsPerServer; 185 } 186 187 public void setConcurrentCallsPerServer(int callsPerServer) { 188 this.concurrentCallsPerServer = callsPerServer; 189 } 190 191 public int getNumActionsPerServer() { 192 return numActionsPerServer; 193 } 194 195 public void setNumActionsPerServer(int numActionsPerServer) { 196 this.numActionsPerServer = numActionsPerServer; 197 } 198 } 199 200 protected static final class CallTracker { 201 private final String name; 202 final Timer callTimer; 203 final Histogram reqHist; 204 final Histogram respHist; 205 206 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 207 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 208 if (subName != null) { 209 sb.append("(").append(subName).append(")"); 210 } 211 this.name = sb.toString(); 212 this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope)); 213 this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope)); 214 this.respHist = 215 registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope)); 216 } 217 218 private CallTracker(MetricRegistry registry, String name, String scope) { 219 this(registry, name, null, scope); 220 } 221 222 public void updateRpc(CallStats stats) { 223 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 224 this.reqHist.update(stats.getRequestSizeBytes()); 225 this.respHist.update(stats.getResponseSizeBytes()); 226 } 227 228 @Override 229 public String toString() { 230 return "CallTracker:" + name; 231 } 232 } 233 234 protected static class RegionStats { 235 final String name; 236 final Histogram memstoreLoadHist; 237 final Histogram heapOccupancyHist; 238 239 public RegionStats(MetricRegistry registry, String name) { 240 this.name = name; 241 this.memstoreLoadHist = 242 registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name)); 243 this.heapOccupancyHist = 244 registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name)); 245 } 246 247 public void update(RegionLoadStats regionStatistics) { 248 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 249 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 250 } 251 } 252 253 protected static class RunnerStats { 254 final Counter normalRunners; 255 final Counter delayRunners; 256 final Histogram delayIntevalHist; 257 258 public RunnerStats(MetricRegistry registry) { 259 this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount")); 260 this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount")); 261 this.delayIntevalHist = 262 registry.histogram(name(MetricsConnection.class, "delayIntervalHist")); 263 } 264 265 public void incrNormalRunners() { 266 this.normalRunners.inc(); 267 } 268 269 public void incrDelayRunners() { 270 this.delayRunners.inc(); 271 } 272 273 public void updateDelayInterval(long interval) { 274 this.delayIntevalHist.update(interval); 275 } 276 } 277 278 private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats = 279 new ConcurrentHashMap<>(); 280 281 public void updateServerStats(ServerName serverName, byte[] regionName, Object r) { 282 if (!(r instanceof Result)) { 283 return; 284 } 285 Result result = (Result) r; 286 RegionLoadStats stats = result.getStats(); 287 if (stats == null) { 288 return; 289 } 290 updateRegionStats(serverName, regionName, stats); 291 } 292 293 @Override 294 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 295 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 296 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 297 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 298 RegionStats regionStats = 299 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 300 regionStats.update(stats); 301 } 302 303 /** A lambda for dispatching to the appropriate metric factory method */ 304 private static interface NewMetric<T> { 305 T newMetric(Class<?> clazz, String name, String scope); 306 } 307 308 /** Anticipated number of metric entries */ 309 private static final int CAPACITY = 50; 310 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 311 private static final float LOAD_FACTOR = 0.75f; 312 /** 313 * Anticipated number of concurrent accessor threads 314 */ 315 private static final int CONCURRENCY_LEVEL = 256; 316 317 private final MetricRegistry registry; 318 private final JmxReporter reporter; 319 private final String scope; 320 private final boolean tableMetricsEnabled; 321 322 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 323 @Override 324 public Timer newMetric(Class<?> clazz, String name, String scope) { 325 return registry.timer(name(clazz, name, scope)); 326 } 327 }; 328 329 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 330 @Override 331 public Histogram newMetric(Class<?> clazz, String name, String scope) { 332 return registry.histogram(name(clazz, name, scope)); 333 } 334 }; 335 336 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 337 @Override 338 public Counter newMetric(Class<?> clazz, String name, String scope) { 339 return registry.counter(name(clazz, name, scope)); 340 } 341 }; 342 343 // List of thread pool per connection of the metrics. 344 private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>(); 345 private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>(); 346 347 // static metrics 348 349 private final Counter connectionCount; 350 private final Counter metaCacheHits; 351 private final Counter metaCacheMisses; 352 private final CallTracker getTracker; 353 private final CallTracker scanTracker; 354 private final CallTracker appendTracker; 355 private final CallTracker deleteTracker; 356 private final CallTracker incrementTracker; 357 private final CallTracker putTracker; 358 private final CallTracker multiTracker; 359 private final RunnerStats runnerStats; 360 private final Counter metaCacheNumClearServer; 361 private final Counter metaCacheNumClearRegion; 362 private final Counter hedgedReadOps; 363 private final Counter hedgedReadWin; 364 private final Histogram concurrentCallsPerServerHist; 365 private final Histogram numActionsPerServerHist; 366 private final Counter nsLookups; 367 private final Counter nsLookupsFailed; 368 private final Timer overloadedBackoffTimer; 369 370 // dynamic metrics 371 372 // These maps are used to cache references to the metric instances that are managed by the 373 // registry. I don't think their use perfectly removes redundant allocations, but it's 374 // a big improvement over calling registry.newMetric each time. 375 private final ConcurrentMap<String, Timer> rpcTimers = 376 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 377 private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>( 378 CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL); 379 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 380 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 381 private final ConcurrentMap<String, Counter> rpcCounters = 382 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 383 384 private MetricsConnection(Configuration conf, String scope, 385 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 386 this.scope = scope; 387 this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false); 388 addThreadPools(batchPool, metaPool); 389 this.registry = new MetricRegistry(); 390 this.registry.register(getExecutorPoolName(), new RatioGauge() { 391 @Override 392 protected Ratio getRatio() { 393 int numerator = 0; 394 int denominator = 0; 395 for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) { 396 ThreadPoolExecutor pool = poolSupplier.get(); 397 if (pool != null) { 398 int activeCount = pool.getActiveCount(); 399 int maxPoolSize = pool.getMaximumPoolSize(); 400 /* The max thread usage ratio among batch pools of all connections */ 401 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 402 numerator = activeCount; 403 denominator = maxPoolSize; 404 } 405 } 406 } 407 return Ratio.of(numerator, denominator); 408 } 409 }); 410 this.registry.register(getMetaPoolName(), new RatioGauge() { 411 @Override 412 protected Ratio getRatio() { 413 int numerator = 0; 414 int denominator = 0; 415 for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) { 416 ThreadPoolExecutor pool = poolSupplier.get(); 417 if (pool != null) { 418 int activeCount = pool.getActiveCount(); 419 int maxPoolSize = pool.getMaximumPoolSize(); 420 /* The max thread usage ratio among meta lookup pools of all connections */ 421 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 422 numerator = activeCount; 423 denominator = maxPoolSize; 424 } 425 } 426 } 427 return Ratio.of(numerator, denominator); 428 } 429 }); 430 this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope)); 431 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 432 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 433 this.metaCacheNumClearServer = 434 registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope)); 435 this.metaCacheNumClearRegion = 436 registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); 437 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 438 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 439 this.getTracker = new CallTracker(this.registry, "Get", scope); 440 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 441 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 442 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 443 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 444 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 445 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 446 this.runnerStats = new RunnerStats(this.registry); 447 this.concurrentCallsPerServerHist = 448 registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); 449 this.numActionsPerServerHist = 450 registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); 451 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); 452 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); 453 this.overloadedBackoffTimer = 454 registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); 455 456 this.reporter = JmxReporter.forRegistry(this.registry).build(); 457 this.reporter.start(); 458 } 459 460 final String getExecutorPoolName() { 461 return name(getClass(), "executorPoolActiveThreads", scope); 462 } 463 464 final String getMetaPoolName() { 465 return name(getClass(), "metaPoolActiveThreads", scope); 466 } 467 468 MetricRegistry getMetricRegistry() { 469 return registry; 470 } 471 472 /** scope of the metrics object */ 473 public String getMetricScope() { 474 return scope; 475 } 476 477 /** serverStats metric */ 478 public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() { 479 return serverStats; 480 } 481 482 /** runnerStats metric */ 483 public RunnerStats getRunnerStats() { 484 return runnerStats; 485 } 486 487 /** metaCacheNumClearServer metric */ 488 public Counter getMetaCacheNumClearServer() { 489 return metaCacheNumClearServer; 490 } 491 492 /** metaCacheNumClearRegion metric */ 493 public Counter getMetaCacheNumClearRegion() { 494 return metaCacheNumClearRegion; 495 } 496 497 /** hedgedReadOps metric */ 498 public Counter getHedgedReadOps() { 499 return hedgedReadOps; 500 } 501 502 /** hedgedReadWin metric */ 503 public Counter getHedgedReadWin() { 504 return hedgedReadWin; 505 } 506 507 /** numActionsPerServerHist metric */ 508 public Histogram getNumActionsPerServerHist() { 509 return numActionsPerServerHist; 510 } 511 512 /** rpcCounters metric */ 513 public ConcurrentMap<String, Counter> getRpcCounters() { 514 return rpcCounters; 515 } 516 517 /** rpcTimers metric */ 518 public ConcurrentMap<String, Timer> getRpcTimers() { 519 return rpcTimers; 520 } 521 522 /** rpcHistograms metric */ 523 public ConcurrentMap<String, Histogram> getRpcHistograms() { 524 return rpcHistograms; 525 } 526 527 /** getTracker metric */ 528 public CallTracker getGetTracker() { 529 return getTracker; 530 } 531 532 /** scanTracker metric */ 533 public CallTracker getScanTracker() { 534 return scanTracker; 535 } 536 537 /** multiTracker metric */ 538 public CallTracker getMultiTracker() { 539 return multiTracker; 540 } 541 542 /** appendTracker metric */ 543 public CallTracker getAppendTracker() { 544 return appendTracker; 545 } 546 547 /** deleteTracker metric */ 548 public CallTracker getDeleteTracker() { 549 return deleteTracker; 550 } 551 552 /** incrementTracker metric */ 553 public CallTracker getIncrementTracker() { 554 return incrementTracker; 555 } 556 557 /** putTracker metric */ 558 public CallTracker getPutTracker() { 559 return putTracker; 560 } 561 562 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 563 public static CallStats newCallStats() { 564 // TODO: instance pool to reduce GC? 565 return new CallStats(); 566 } 567 568 /** Increment the number of meta cache hits. */ 569 public void incrMetaCacheHit() { 570 metaCacheHits.inc(); 571 } 572 573 /** Increment the number of meta cache misses. */ 574 public void incrMetaCacheMiss() { 575 metaCacheMisses.inc(); 576 } 577 578 public long getMetaCacheMisses() { 579 return metaCacheMisses.getCount(); 580 } 581 582 /** Increment the number of meta cache drops requested for entire RegionServer. */ 583 public void incrMetaCacheNumClearServer() { 584 metaCacheNumClearServer.inc(); 585 } 586 587 /** Increment the number of meta cache drops requested for individual region. */ 588 public void incrMetaCacheNumClearRegion() { 589 metaCacheNumClearRegion.inc(); 590 } 591 592 /** Increment the number of meta cache drops requested for individual region. */ 593 public void incrMetaCacheNumClearRegion(int count) { 594 metaCacheNumClearRegion.inc(count); 595 } 596 597 /** Increment the number of hedged read that have occurred. */ 598 public void incrHedgedReadOps() { 599 hedgedReadOps.inc(); 600 } 601 602 /** Increment the number of hedged read returned faster than the original read. */ 603 public void incrHedgedReadWin() { 604 hedgedReadWin.inc(); 605 } 606 607 /** Increment the number of normal runner counts. */ 608 public void incrNormalRunners() { 609 this.runnerStats.incrNormalRunners(); 610 } 611 612 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 613 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 614 this.runnerStats.incrDelayRunners(); 615 this.runnerStats.updateDelayInterval(interval); 616 } 617 618 /** Update the overloaded backoff time **/ 619 public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { 620 overloadedBackoffTimer.update(time, timeUnit); 621 } 622 623 /** Return the connection count of the metrics within a scope */ 624 public long getConnectionCount() { 625 return connectionCount.getCount(); 626 } 627 628 /** Increment the connection count of the metrics within a scope */ 629 private void incrConnectionCount() { 630 connectionCount.inc(); 631 } 632 633 /** Decrement the connection count of the metrics within a scope */ 634 private void decrConnectionCount() { 635 connectionCount.dec(); 636 } 637 638 /** Add thread pools of additional connections to the metrics */ 639 private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool, 640 Supplier<ThreadPoolExecutor> metaPool) { 641 batchPools.add(batchPool); 642 metaPools.add(metaPool); 643 } 644 645 /** 646 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 647 */ 648 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 649 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 650 } 651 652 /** Update call stats for non-critical-path methods */ 653 private void updateRpcGeneric(String methodName, CallStats stats) { 654 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(), 655 TimeUnit.MILLISECONDS); 656 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 657 .update(stats.getRequestSizeBytes()); 658 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 659 .update(stats.getResponseSizeBytes()); 660 } 661 662 private void shutdown() { 663 this.reporter.stop(); 664 } 665 666 /** Report RPC context to metrics system. */ 667 public void updateRpc(MethodDescriptor method, TableName tableName, Message param, 668 CallStats stats, Throwable e) { 669 int callsPerServer = stats.getConcurrentCallsPerServer(); 670 if (callsPerServer > 0) { 671 concurrentCallsPerServerHist.update(callsPerServer); 672 } 673 // Update the counter that tracks RPCs by type. 674 StringBuilder methodName = new StringBuilder(); 675 methodName.append(method.getService().getName()).append("_").append(method.getName()); 676 // Distinguish mutate types. 677 if ("Mutate".equals(method.getName())) { 678 final MutationType type = ((MutateRequest) param).getMutation().getMutateType(); 679 switch (type) { 680 case APPEND: 681 methodName.append("(Append)"); 682 break; 683 case DELETE: 684 methodName.append("(Delete)"); 685 break; 686 case INCREMENT: 687 methodName.append("(Increment)"); 688 break; 689 case PUT: 690 methodName.append("(Put)"); 691 break; 692 default: 693 methodName.append("(Unknown)"); 694 } 695 } 696 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 697 if (e != null) { 698 getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 699 getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc(); 700 if (e instanceof RemoteException) { 701 String fullClassName = ((RemoteException) e).getClassName(); 702 String simpleClassName = (fullClassName != null) 703 ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1) 704 : "unknown"; 705 getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc(); 706 } else { 707 getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters, 708 counterFactory).inc(); 709 } 710 } 711 // this implementation is tied directly to protobuf implementation details. would be better 712 // if we could dispatch based on something static, ie, request Message type. 713 if (method.getService() == ClientService.getDescriptor()) { 714 switch (method.getIndex()) { 715 case 0: 716 assert "Get".equals(method.getName()); 717 getTracker.updateRpc(stats); 718 updateTableMetric(methodName.toString(), tableName, stats, e); 719 return; 720 case 1: 721 assert "Mutate".equals(method.getName()); 722 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 723 switch (mutationType) { 724 case APPEND: 725 appendTracker.updateRpc(stats); 726 break; 727 case DELETE: 728 deleteTracker.updateRpc(stats); 729 break; 730 case INCREMENT: 731 incrementTracker.updateRpc(stats); 732 break; 733 case PUT: 734 putTracker.updateRpc(stats); 735 break; 736 default: 737 throw new RuntimeException("Unrecognized mutation type " + mutationType); 738 } 739 updateTableMetric(methodName.toString(), tableName, stats, e); 740 return; 741 case 2: 742 assert "Scan".equals(method.getName()); 743 scanTracker.updateRpc(stats); 744 updateTableMetric(methodName.toString(), tableName, stats, e); 745 return; 746 case 3: 747 assert "BulkLoadHFile".equals(method.getName()); 748 // use generic implementation 749 break; 750 case 4: 751 assert "PrepareBulkLoad".equals(method.getName()); 752 // use generic implementation 753 break; 754 case 5: 755 assert "CleanupBulkLoad".equals(method.getName()); 756 // use generic implementation 757 break; 758 case 6: 759 assert "ExecService".equals(method.getName()); 760 // use generic implementation 761 break; 762 case 7: 763 assert "ExecRegionServerService".equals(method.getName()); 764 // use generic implementation 765 break; 766 case 8: 767 assert "Multi".equals(method.getName()); 768 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 769 multiTracker.updateRpc(stats); 770 updateTableMetric(methodName.toString(), tableName, stats, e); 771 return; 772 default: 773 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 774 } 775 } 776 // Fallback to dynamic registry lookup for DDL methods. 777 updateRpcGeneric(methodName.toString(), stats); 778 } 779 780 /** Report table rpc context to metrics system. */ 781 private void updateTableMetric(String methodName, TableName tableName, CallStats stats, 782 Throwable e) { 783 if (tableMetricsEnabled) { 784 if (methodName != null) { 785 String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString()) 786 ? tableName.getNameAsString() 787 : "unknown"; 788 String metricKey = methodName + "_" + table; 789 // update table rpc context to metrics system, 790 // includes rpc call duration, rpc call request/response size(bytes). 791 updateRpcGeneric(metricKey, stats); 792 if (e != null) { 793 // rpc failure call counter with table name. 794 getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc(); 795 } 796 } 797 } 798 } 799 800 public void incrCacheDroppingExceptions(Object exception) { 801 getMetric( 802 CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 803 cacheDroppingExceptions, counterFactory).inc(); 804 } 805 806 public void incrNsLookups() { 807 this.nsLookups.inc(); 808 } 809 810 public void incrNsLookupsFailed() { 811 this.nsLookupsFailed.inc(); 812 } 813}