001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.yetus.audience.InterfaceAudience; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 048 049/** 050 * This class is for maintaining the various connection statistics and publishing them through the 051 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so 052 * as to not conflict with other uses of Yammer Metrics within the client application. Calling 053 * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts" 054 * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate 055 * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when 056 * all connections within this metrics instances are closed. 057 */ 058@InterfaceAudience.Private 059public final class MetricsConnection implements StatisticTrackable { 060 061 private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES = 062 new ConcurrentHashMap<>(); 063 064 static MetricsConnection getMetricsConnection(final String scope, 065 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 066 return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> { 067 if (metricsConnection == null) { 068 MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool); 069 newMetricsConn.incrConnectionCount(); 070 return newMetricsConn; 071 } else { 072 metricsConnection.addThreadPools(batchPool, metaPool); 073 metricsConnection.incrConnectionCount(); 074 return metricsConnection; 075 } 076 }); 077 } 078 079 static void deleteMetricsConnection(final String scope) { 080 METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> { 081 metricsConnection.decrConnectionCount(); 082 if (metricsConnection.getConnectionCount() == 0) { 083 metricsConnection.shutdown(); 084 return null; 085 } 086 return metricsConnection; 087 }); 088 } 089 090 /** Set this key to {@code true} to enable metrics collection of client requests. */ 091 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 092 093 /** 094 * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The 095 * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's 096 * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be 097 * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users 098 * may set this key to give a more contextual name for this scope. For example, one might want to 099 * differentiate a read connection from a write connection by setting the scopes to "foo-read" and 100 * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics. 101 * Care should be taken to avoid using the same scope for multiple Connections, otherwise the 102 * metrics may aggregate in unforeseen ways. 103 */ 104 public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; 105 106 /** 107 * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or 108 * by generating a default from the passed clusterId and connectionObj's hashCode. 109 * @param conf configuration for the connection 110 * @param clusterId clusterId for the connection 111 * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this 112 * MetricsConnection. 113 */ 114 static String getScope(Configuration conf, String clusterId, Object connectionObj) { 115 return conf.get(METRICS_SCOPE_KEY, 116 clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); 117 } 118 119 private static final String CNT_BASE = "rpcCount_"; 120 private static final String FAILURE_CNT_BASE = "rpcFailureCount_"; 121 private static final String DRTN_BASE = "rpcCallDurationMs_"; 122 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 123 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 124 private static final String MEMLOAD_BASE = "memstoreLoad_"; 125 private static final String HEAP_BASE = "heapOccupancy_"; 126 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 127 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 128 private static final String NS_LOOKUPS = "nsLookups"; 129 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; 130 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 131 132 /** A container class for collecting details about the RPC call as it percolates. */ 133 public static class CallStats { 134 private long requestSizeBytes = 0; 135 private long responseSizeBytes = 0; 136 private long startTime = 0; 137 private long callTimeMs = 0; 138 private int concurrentCallsPerServer = 0; 139 private int numActionsPerServer = 0; 140 141 public long getRequestSizeBytes() { 142 return requestSizeBytes; 143 } 144 145 public void setRequestSizeBytes(long requestSizeBytes) { 146 this.requestSizeBytes = requestSizeBytes; 147 } 148 149 public long getResponseSizeBytes() { 150 return responseSizeBytes; 151 } 152 153 public void setResponseSizeBytes(long responseSizeBytes) { 154 this.responseSizeBytes = responseSizeBytes; 155 } 156 157 public long getStartTime() { 158 return startTime; 159 } 160 161 public void setStartTime(long startTime) { 162 this.startTime = startTime; 163 } 164 165 public long getCallTimeMs() { 166 return callTimeMs; 167 } 168 169 public void setCallTimeMs(long callTimeMs) { 170 this.callTimeMs = callTimeMs; 171 } 172 173 public int getConcurrentCallsPerServer() { 174 return concurrentCallsPerServer; 175 } 176 177 public void setConcurrentCallsPerServer(int callsPerServer) { 178 this.concurrentCallsPerServer = callsPerServer; 179 } 180 181 public int getNumActionsPerServer() { 182 return numActionsPerServer; 183 } 184 185 public void setNumActionsPerServer(int numActionsPerServer) { 186 this.numActionsPerServer = numActionsPerServer; 187 } 188 } 189 190 protected static final class CallTracker { 191 private final String name; 192 final Timer callTimer; 193 final Histogram reqHist; 194 final Histogram respHist; 195 196 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 197 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 198 if (subName != null) { 199 sb.append("(").append(subName).append(")"); 200 } 201 this.name = sb.toString(); 202 this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope)); 203 this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope)); 204 this.respHist = 205 registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope)); 206 } 207 208 private CallTracker(MetricRegistry registry, String name, String scope) { 209 this(registry, name, null, scope); 210 } 211 212 public void updateRpc(CallStats stats) { 213 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 214 this.reqHist.update(stats.getRequestSizeBytes()); 215 this.respHist.update(stats.getResponseSizeBytes()); 216 } 217 218 @Override 219 public String toString() { 220 return "CallTracker:" + name; 221 } 222 } 223 224 protected static class RegionStats { 225 final String name; 226 final Histogram memstoreLoadHist; 227 final Histogram heapOccupancyHist; 228 229 public RegionStats(MetricRegistry registry, String name) { 230 this.name = name; 231 this.memstoreLoadHist = 232 registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name)); 233 this.heapOccupancyHist = 234 registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name)); 235 } 236 237 public void update(RegionLoadStats regionStatistics) { 238 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 239 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 240 } 241 } 242 243 protected static class RunnerStats { 244 final Counter normalRunners; 245 final Counter delayRunners; 246 final Histogram delayIntevalHist; 247 248 public RunnerStats(MetricRegistry registry) { 249 this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount")); 250 this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount")); 251 this.delayIntevalHist = 252 registry.histogram(name(MetricsConnection.class, "delayIntervalHist")); 253 } 254 255 public void incrNormalRunners() { 256 this.normalRunners.inc(); 257 } 258 259 public void incrDelayRunners() { 260 this.delayRunners.inc(); 261 } 262 263 public void updateDelayInterval(long interval) { 264 this.delayIntevalHist.update(interval); 265 } 266 } 267 268 private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats = 269 new ConcurrentHashMap<>(); 270 271 public void updateServerStats(ServerName serverName, byte[] regionName, Object r) { 272 if (!(r instanceof Result)) { 273 return; 274 } 275 Result result = (Result) r; 276 RegionLoadStats stats = result.getStats(); 277 if (stats == null) { 278 return; 279 } 280 updateRegionStats(serverName, regionName, stats); 281 } 282 283 @Override 284 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 285 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 286 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 287 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 288 RegionStats regionStats = 289 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 290 regionStats.update(stats); 291 } 292 293 /** A lambda for dispatching to the appropriate metric factory method */ 294 private static interface NewMetric<T> { 295 T newMetric(Class<?> clazz, String name, String scope); 296 } 297 298 /** Anticipated number of metric entries */ 299 private static final int CAPACITY = 50; 300 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 301 private static final float LOAD_FACTOR = 0.75f; 302 /** 303 * Anticipated number of concurrent accessor threads 304 */ 305 private static final int CONCURRENCY_LEVEL = 256; 306 307 private final MetricRegistry registry; 308 private final JmxReporter reporter; 309 private final String scope; 310 311 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 312 @Override 313 public Timer newMetric(Class<?> clazz, String name, String scope) { 314 return registry.timer(name(clazz, name, scope)); 315 } 316 }; 317 318 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 319 @Override 320 public Histogram newMetric(Class<?> clazz, String name, String scope) { 321 return registry.histogram(name(clazz, name, scope)); 322 } 323 }; 324 325 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 326 @Override 327 public Counter newMetric(Class<?> clazz, String name, String scope) { 328 return registry.counter(name(clazz, name, scope)); 329 } 330 }; 331 332 // List of thread pool per connection of the metrics. 333 private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>(); 334 private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>(); 335 336 // static metrics 337 338 private final Counter connectionCount; 339 private final Counter metaCacheHits; 340 private final Counter metaCacheMisses; 341 private final CallTracker getTracker; 342 private final CallTracker scanTracker; 343 private final CallTracker appendTracker; 344 private final CallTracker deleteTracker; 345 private final CallTracker incrementTracker; 346 private final CallTracker putTracker; 347 private final CallTracker multiTracker; 348 private final RunnerStats runnerStats; 349 private final Counter metaCacheNumClearServer; 350 private final Counter metaCacheNumClearRegion; 351 private final Counter hedgedReadOps; 352 private final Counter hedgedReadWin; 353 private final Histogram concurrentCallsPerServerHist; 354 private final Histogram numActionsPerServerHist; 355 private final Counter nsLookups; 356 private final Counter nsLookupsFailed; 357 private final Timer overloadedBackoffTimer; 358 359 // dynamic metrics 360 361 // These maps are used to cache references to the metric instances that are managed by the 362 // registry. I don't think their use perfectly removes redundant allocations, but it's 363 // a big improvement over calling registry.newMetric each time. 364 private final ConcurrentMap<String, Timer> rpcTimers = 365 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 366 private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>( 367 CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL); 368 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 369 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 370 private final ConcurrentMap<String, Counter> rpcCounters = 371 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 372 373 private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 374 Supplier<ThreadPoolExecutor> metaPool) { 375 this.scope = scope; 376 addThreadPools(batchPool, metaPool); 377 this.registry = new MetricRegistry(); 378 this.registry.register(getExecutorPoolName(), new RatioGauge() { 379 @Override 380 protected Ratio getRatio() { 381 int numerator = 0; 382 int denominator = 0; 383 for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) { 384 ThreadPoolExecutor pool = poolSupplier.get(); 385 if (pool != null) { 386 int activeCount = pool.getActiveCount(); 387 int maxPoolSize = pool.getMaximumPoolSize(); 388 /* The max thread usage ratio among batch pools of all connections */ 389 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 390 numerator = activeCount; 391 denominator = maxPoolSize; 392 } 393 } 394 } 395 return Ratio.of(numerator, denominator); 396 } 397 }); 398 this.registry.register(getMetaPoolName(), new RatioGauge() { 399 @Override 400 protected Ratio getRatio() { 401 int numerator = 0; 402 int denominator = 0; 403 for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) { 404 ThreadPoolExecutor pool = poolSupplier.get(); 405 if (pool != null) { 406 int activeCount = pool.getActiveCount(); 407 int maxPoolSize = pool.getMaximumPoolSize(); 408 /* The max thread usage ratio among meta lookup pools of all connections */ 409 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 410 numerator = activeCount; 411 denominator = maxPoolSize; 412 } 413 } 414 } 415 return Ratio.of(numerator, denominator); 416 } 417 }); 418 this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope)); 419 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 420 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 421 this.metaCacheNumClearServer = 422 registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope)); 423 this.metaCacheNumClearRegion = 424 registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); 425 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 426 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 427 this.getTracker = new CallTracker(this.registry, "Get", scope); 428 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 429 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 430 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 431 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 432 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 433 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 434 this.runnerStats = new RunnerStats(this.registry); 435 this.concurrentCallsPerServerHist = 436 registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); 437 this.numActionsPerServerHist = 438 registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); 439 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); 440 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); 441 this.overloadedBackoffTimer = 442 registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); 443 444 this.reporter = JmxReporter.forRegistry(this.registry).build(); 445 this.reporter.start(); 446 } 447 448 final String getExecutorPoolName() { 449 return name(getClass(), "executorPoolActiveThreads", scope); 450 } 451 452 final String getMetaPoolName() { 453 return name(getClass(), "metaPoolActiveThreads", scope); 454 } 455 456 MetricRegistry getMetricRegistry() { 457 return registry; 458 } 459 460 /** scope of the metrics object */ 461 public String getMetricScope() { 462 return scope; 463 } 464 465 /** serverStats metric */ 466 public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() { 467 return serverStats; 468 } 469 470 /** runnerStats metric */ 471 public RunnerStats getRunnerStats() { 472 return runnerStats; 473 } 474 475 /** metaCacheNumClearServer metric */ 476 public Counter getMetaCacheNumClearServer() { 477 return metaCacheNumClearServer; 478 } 479 480 /** metaCacheNumClearRegion metric */ 481 public Counter getMetaCacheNumClearRegion() { 482 return metaCacheNumClearRegion; 483 } 484 485 /** hedgedReadOps metric */ 486 public Counter getHedgedReadOps() { 487 return hedgedReadOps; 488 } 489 490 /** hedgedReadWin metric */ 491 public Counter getHedgedReadWin() { 492 return hedgedReadWin; 493 } 494 495 /** numActionsPerServerHist metric */ 496 public Histogram getNumActionsPerServerHist() { 497 return numActionsPerServerHist; 498 } 499 500 /** rpcCounters metric */ 501 public ConcurrentMap<String, Counter> getRpcCounters() { 502 return rpcCounters; 503 } 504 505 /** getTracker metric */ 506 public CallTracker getGetTracker() { 507 return getTracker; 508 } 509 510 /** scanTracker metric */ 511 public CallTracker getScanTracker() { 512 return scanTracker; 513 } 514 515 /** multiTracker metric */ 516 public CallTracker getMultiTracker() { 517 return multiTracker; 518 } 519 520 /** appendTracker metric */ 521 public CallTracker getAppendTracker() { 522 return appendTracker; 523 } 524 525 /** deleteTracker metric */ 526 public CallTracker getDeleteTracker() { 527 return deleteTracker; 528 } 529 530 /** incrementTracker metric */ 531 public CallTracker getIncrementTracker() { 532 return incrementTracker; 533 } 534 535 /** putTracker metric */ 536 public CallTracker getPutTracker() { 537 return putTracker; 538 } 539 540 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 541 public static CallStats newCallStats() { 542 // TODO: instance pool to reduce GC? 543 return new CallStats(); 544 } 545 546 /** Increment the number of meta cache hits. */ 547 public void incrMetaCacheHit() { 548 metaCacheHits.inc(); 549 } 550 551 /** Increment the number of meta cache misses. */ 552 public void incrMetaCacheMiss() { 553 metaCacheMisses.inc(); 554 } 555 556 /** Increment the number of meta cache drops requested for entire RegionServer. */ 557 public void incrMetaCacheNumClearServer() { 558 metaCacheNumClearServer.inc(); 559 } 560 561 /** Increment the number of meta cache drops requested for individual region. */ 562 public void incrMetaCacheNumClearRegion() { 563 metaCacheNumClearRegion.inc(); 564 } 565 566 /** Increment the number of meta cache drops requested for individual region. */ 567 public void incrMetaCacheNumClearRegion(int count) { 568 metaCacheNumClearRegion.inc(count); 569 } 570 571 /** Increment the number of hedged read that have occurred. */ 572 public void incrHedgedReadOps() { 573 hedgedReadOps.inc(); 574 } 575 576 /** Increment the number of hedged read returned faster than the original read. */ 577 public void incrHedgedReadWin() { 578 hedgedReadWin.inc(); 579 } 580 581 /** Increment the number of normal runner counts. */ 582 public void incrNormalRunners() { 583 this.runnerStats.incrNormalRunners(); 584 } 585 586 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 587 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 588 this.runnerStats.incrDelayRunners(); 589 this.runnerStats.updateDelayInterval(interval); 590 } 591 592 /** Update the overloaded backoff time **/ 593 public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { 594 overloadedBackoffTimer.update(time, timeUnit); 595 } 596 597 /** Return the connection count of the metrics within a scope */ 598 public long getConnectionCount() { 599 return connectionCount.getCount(); 600 } 601 602 /** Increment the connection count of the metrics within a scope */ 603 private void incrConnectionCount() { 604 connectionCount.inc(); 605 } 606 607 /** Decrement the connection count of the metrics within a scope */ 608 private void decrConnectionCount() { 609 connectionCount.dec(); 610 } 611 612 /** Add thread pools of additional connections to the metrics */ 613 private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool, 614 Supplier<ThreadPoolExecutor> metaPool) { 615 batchPools.add(batchPool); 616 metaPools.add(metaPool); 617 } 618 619 /** 620 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 621 */ 622 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 623 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 624 } 625 626 /** Update call stats for non-critical-path methods */ 627 private void updateRpcGeneric(String methodName, CallStats stats) { 628 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(), 629 TimeUnit.MILLISECONDS); 630 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 631 .update(stats.getRequestSizeBytes()); 632 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 633 .update(stats.getResponseSizeBytes()); 634 } 635 636 private void shutdown() { 637 this.reporter.stop(); 638 } 639 640 /** Report RPC context to metrics system. */ 641 public void updateRpc(MethodDescriptor method, Message param, CallStats stats, boolean failed) { 642 int callsPerServer = stats.getConcurrentCallsPerServer(); 643 if (callsPerServer > 0) { 644 concurrentCallsPerServerHist.update(callsPerServer); 645 } 646 // Update the counter that tracks RPCs by type. 647 final String methodName = method.getService().getName() + "_" + method.getName(); 648 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 649 if (failed) { 650 getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 651 } 652 // this implementation is tied directly to protobuf implementation details. would be better 653 // if we could dispatch based on something static, ie, request Message type. 654 if (method.getService() == ClientService.getDescriptor()) { 655 switch (method.getIndex()) { 656 case 0: 657 assert "Get".equals(method.getName()); 658 getTracker.updateRpc(stats); 659 return; 660 case 1: 661 assert "Mutate".equals(method.getName()); 662 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 663 switch (mutationType) { 664 case APPEND: 665 appendTracker.updateRpc(stats); 666 return; 667 case DELETE: 668 deleteTracker.updateRpc(stats); 669 return; 670 case INCREMENT: 671 incrementTracker.updateRpc(stats); 672 return; 673 case PUT: 674 putTracker.updateRpc(stats); 675 return; 676 default: 677 throw new RuntimeException("Unrecognized mutation type " + mutationType); 678 } 679 case 2: 680 assert "Scan".equals(method.getName()); 681 scanTracker.updateRpc(stats); 682 return; 683 case 3: 684 assert "BulkLoadHFile".equals(method.getName()); 685 // use generic implementation 686 break; 687 case 4: 688 assert "PrepareBulkLoad".equals(method.getName()); 689 // use generic implementation 690 break; 691 case 5: 692 assert "CleanupBulkLoad".equals(method.getName()); 693 // use generic implementation 694 break; 695 case 6: 696 assert "ExecService".equals(method.getName()); 697 // use generic implementation 698 break; 699 case 7: 700 assert "ExecRegionServerService".equals(method.getName()); 701 // use generic implementation 702 break; 703 case 8: 704 assert "Multi".equals(method.getName()); 705 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 706 multiTracker.updateRpc(stats); 707 return; 708 default: 709 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 710 } 711 } 712 // Fallback to dynamic registry lookup for DDL methods. 713 updateRpcGeneric(methodName, stats); 714 } 715 716 public void incrCacheDroppingExceptions(Object exception) { 717 getMetric( 718 CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 719 cacheDroppingExceptions, counterFactory).inc(); 720 } 721 722 public void incrNsLookups() { 723 this.nsLookups.inc(); 724 } 725 726 public void incrNsLookupsFailed() { 727 this.nsLookupsFailed.inc(); 728 } 729}