001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Supplier;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
048
049/**
050 * This class is for maintaining the various connection statistics and publishing them through the
051 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
052 * as to not conflict with other uses of Yammer Metrics within the client application. Calling
053 * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
054 * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
055 * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
056 * all connections within this metrics instances are closed.
057 */
058@InterfaceAudience.Private
059public final class MetricsConnection implements StatisticTrackable {
060
061  private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
062    new ConcurrentHashMap<>();
063
064  static MetricsConnection getMetricsConnection(final String scope,
065    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
066    return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
067      if (metricsConnection == null) {
068        MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
069        newMetricsConn.incrConnectionCount();
070        return newMetricsConn;
071      } else {
072        metricsConnection.addThreadPools(batchPool, metaPool);
073        metricsConnection.incrConnectionCount();
074        return metricsConnection;
075      }
076    });
077  }
078
079  static void deleteMetricsConnection(final String scope) {
080    METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
081      metricsConnection.decrConnectionCount();
082      if (metricsConnection.getConnectionCount() == 0) {
083        metricsConnection.shutdown();
084        return null;
085      }
086      return metricsConnection;
087    });
088  }
089
090  /** Set this key to {@code true} to enable metrics collection of client requests. */
091  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
092
093  /**
094   * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
095   * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
096   * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be
097   * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users
098   * may set this key to give a more contextual name for this scope. For example, one might want to
099   * differentiate a read connection from a write connection by setting the scopes to "foo-read" and
100   * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics.
101   * Care should be taken to avoid using the same scope for multiple Connections, otherwise the
102   * metrics may aggregate in unforeseen ways.
103   */
104  public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
105
106  /**
107   * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or
108   * by generating a default from the passed clusterId and connectionObj's hashCode.
109   * @param conf          configuration for the connection
110   * @param clusterId     clusterId for the connection
111   * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this
112   *                      MetricsConnection.
113   */
114  static String getScope(Configuration conf, String clusterId, Object connectionObj) {
115    return conf.get(METRICS_SCOPE_KEY,
116      clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
117  }
118
119  private static final String CNT_BASE = "rpcCount_";
120  private static final String FAILURE_CNT_BASE = "rpcFailureCount_";
121  private static final String DRTN_BASE = "rpcCallDurationMs_";
122  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
123  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
124  private static final String MEMLOAD_BASE = "memstoreLoad_";
125  private static final String HEAP_BASE = "heapOccupancy_";
126  private static final String CACHE_BASE = "cacheDroppingExceptions_";
127  private static final String UNKNOWN_EXCEPTION = "UnknownException";
128  private static final String NS_LOOKUPS = "nsLookups";
129  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
130  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
131
132  /** A container class for collecting details about the RPC call as it percolates. */
133  public static class CallStats {
134    private long requestSizeBytes = 0;
135    private long responseSizeBytes = 0;
136    private long startTime = 0;
137    private long callTimeMs = 0;
138    private int concurrentCallsPerServer = 0;
139    private int numActionsPerServer = 0;
140
141    public long getRequestSizeBytes() {
142      return requestSizeBytes;
143    }
144
145    public void setRequestSizeBytes(long requestSizeBytes) {
146      this.requestSizeBytes = requestSizeBytes;
147    }
148
149    public long getResponseSizeBytes() {
150      return responseSizeBytes;
151    }
152
153    public void setResponseSizeBytes(long responseSizeBytes) {
154      this.responseSizeBytes = responseSizeBytes;
155    }
156
157    public long getStartTime() {
158      return startTime;
159    }
160
161    public void setStartTime(long startTime) {
162      this.startTime = startTime;
163    }
164
165    public long getCallTimeMs() {
166      return callTimeMs;
167    }
168
169    public void setCallTimeMs(long callTimeMs) {
170      this.callTimeMs = callTimeMs;
171    }
172
173    public int getConcurrentCallsPerServer() {
174      return concurrentCallsPerServer;
175    }
176
177    public void setConcurrentCallsPerServer(int callsPerServer) {
178      this.concurrentCallsPerServer = callsPerServer;
179    }
180
181    public int getNumActionsPerServer() {
182      return numActionsPerServer;
183    }
184
185    public void setNumActionsPerServer(int numActionsPerServer) {
186      this.numActionsPerServer = numActionsPerServer;
187    }
188  }
189
190  protected static final class CallTracker {
191    private final String name;
192    final Timer callTimer;
193    final Histogram reqHist;
194    final Histogram respHist;
195
196    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
197      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
198      if (subName != null) {
199        sb.append("(").append(subName).append(")");
200      }
201      this.name = sb.toString();
202      this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope));
203      this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope));
204      this.respHist =
205        registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope));
206    }
207
208    private CallTracker(MetricRegistry registry, String name, String scope) {
209      this(registry, name, null, scope);
210    }
211
212    public void updateRpc(CallStats stats) {
213      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
214      this.reqHist.update(stats.getRequestSizeBytes());
215      this.respHist.update(stats.getResponseSizeBytes());
216    }
217
218    @Override
219    public String toString() {
220      return "CallTracker:" + name;
221    }
222  }
223
224  protected static class RegionStats {
225    final String name;
226    final Histogram memstoreLoadHist;
227    final Histogram heapOccupancyHist;
228
229    public RegionStats(MetricRegistry registry, String name) {
230      this.name = name;
231      this.memstoreLoadHist =
232        registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name));
233      this.heapOccupancyHist =
234        registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name));
235    }
236
237    public void update(RegionLoadStats regionStatistics) {
238      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
239      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
240    }
241  }
242
243  protected static class RunnerStats {
244    final Counter normalRunners;
245    final Counter delayRunners;
246    final Histogram delayIntevalHist;
247
248    public RunnerStats(MetricRegistry registry) {
249      this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
250      this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
251      this.delayIntevalHist =
252        registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
253    }
254
255    public void incrNormalRunners() {
256      this.normalRunners.inc();
257    }
258
259    public void incrDelayRunners() {
260      this.delayRunners.inc();
261    }
262
263    public void updateDelayInterval(long interval) {
264      this.delayIntevalHist.update(interval);
265    }
266  }
267
268  private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
269    new ConcurrentHashMap<>();
270
271  public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
272    if (!(r instanceof Result)) {
273      return;
274    }
275    Result result = (Result) r;
276    RegionLoadStats stats = result.getStats();
277    if (stats == null) {
278      return;
279    }
280    updateRegionStats(serverName, regionName, stats);
281  }
282
283  @Override
284  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
285    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
286    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
287      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
288    RegionStats regionStats =
289      computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
290    regionStats.update(stats);
291  }
292
293  /** A lambda for dispatching to the appropriate metric factory method */
294  private static interface NewMetric<T> {
295    T newMetric(Class<?> clazz, String name, String scope);
296  }
297
298  /** Anticipated number of metric entries */
299  private static final int CAPACITY = 50;
300  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
301  private static final float LOAD_FACTOR = 0.75f;
302  /**
303   * Anticipated number of concurrent accessor threads
304   */
305  private static final int CONCURRENCY_LEVEL = 256;
306
307  private final MetricRegistry registry;
308  private final JmxReporter reporter;
309  private final String scope;
310
311  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
312    @Override
313    public Timer newMetric(Class<?> clazz, String name, String scope) {
314      return registry.timer(name(clazz, name, scope));
315    }
316  };
317
318  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
319    @Override
320    public Histogram newMetric(Class<?> clazz, String name, String scope) {
321      return registry.histogram(name(clazz, name, scope));
322    }
323  };
324
325  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
326    @Override
327    public Counter newMetric(Class<?> clazz, String name, String scope) {
328      return registry.counter(name(clazz, name, scope));
329    }
330  };
331
332  // List of thread pool per connection of the metrics.
333  private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
334  private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
335
336  // static metrics
337
338  private final Counter connectionCount;
339  private final Counter metaCacheHits;
340  private final Counter metaCacheMisses;
341  private final CallTracker getTracker;
342  private final CallTracker scanTracker;
343  private final CallTracker appendTracker;
344  private final CallTracker deleteTracker;
345  private final CallTracker incrementTracker;
346  private final CallTracker putTracker;
347  private final CallTracker multiTracker;
348  private final RunnerStats runnerStats;
349  private final Counter metaCacheNumClearServer;
350  private final Counter metaCacheNumClearRegion;
351  private final Counter hedgedReadOps;
352  private final Counter hedgedReadWin;
353  private final Histogram concurrentCallsPerServerHist;
354  private final Histogram numActionsPerServerHist;
355  private final Counter nsLookups;
356  private final Counter nsLookupsFailed;
357  private final Timer overloadedBackoffTimer;
358
359  // dynamic metrics
360
361  // These maps are used to cache references to the metric instances that are managed by the
362  // registry. I don't think their use perfectly removes redundant allocations, but it's
363  // a big improvement over calling registry.newMetric each time.
364  private final ConcurrentMap<String, Timer> rpcTimers =
365    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
366  private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
367    CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
368  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
369    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
370  private final ConcurrentMap<String, Counter> rpcCounters =
371    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
372
373  private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
374    Supplier<ThreadPoolExecutor> metaPool) {
375    this.scope = scope;
376    addThreadPools(batchPool, metaPool);
377    this.registry = new MetricRegistry();
378    this.registry.register(getExecutorPoolName(), new RatioGauge() {
379      @Override
380      protected Ratio getRatio() {
381        int numerator = 0;
382        int denominator = 0;
383        for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
384          ThreadPoolExecutor pool = poolSupplier.get();
385          if (pool != null) {
386            int activeCount = pool.getActiveCount();
387            int maxPoolSize = pool.getMaximumPoolSize();
388            /* The max thread usage ratio among batch pools of all connections */
389            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
390              numerator = activeCount;
391              denominator = maxPoolSize;
392            }
393          }
394        }
395        return Ratio.of(numerator, denominator);
396      }
397    });
398    this.registry.register(getMetaPoolName(), new RatioGauge() {
399      @Override
400      protected Ratio getRatio() {
401        int numerator = 0;
402        int denominator = 0;
403        for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
404          ThreadPoolExecutor pool = poolSupplier.get();
405          if (pool != null) {
406            int activeCount = pool.getActiveCount();
407            int maxPoolSize = pool.getMaximumPoolSize();
408            /* The max thread usage ratio among meta lookup pools of all connections */
409            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
410              numerator = activeCount;
411              denominator = maxPoolSize;
412            }
413          }
414        }
415        return Ratio.of(numerator, denominator);
416      }
417    });
418    this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
419    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
420    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
421    this.metaCacheNumClearServer =
422      registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope));
423    this.metaCacheNumClearRegion =
424      registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope));
425    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
426    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
427    this.getTracker = new CallTracker(this.registry, "Get", scope);
428    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
429    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
430    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
431    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
432    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
433    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
434    this.runnerStats = new RunnerStats(this.registry);
435    this.concurrentCallsPerServerHist =
436      registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope));
437    this.numActionsPerServerHist =
438      registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope));
439    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
440    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
441    this.overloadedBackoffTimer =
442      registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
443
444    this.reporter = JmxReporter.forRegistry(this.registry).build();
445    this.reporter.start();
446  }
447
448  final String getExecutorPoolName() {
449    return name(getClass(), "executorPoolActiveThreads", scope);
450  }
451
452  final String getMetaPoolName() {
453    return name(getClass(), "metaPoolActiveThreads", scope);
454  }
455
456  MetricRegistry getMetricRegistry() {
457    return registry;
458  }
459
460  /** scope of the metrics object */
461  public String getMetricScope() {
462    return scope;
463  }
464
465  /** serverStats metric */
466  public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
467    return serverStats;
468  }
469
470  /** runnerStats metric */
471  public RunnerStats getRunnerStats() {
472    return runnerStats;
473  }
474
475  /** metaCacheNumClearServer metric */
476  public Counter getMetaCacheNumClearServer() {
477    return metaCacheNumClearServer;
478  }
479
480  /** metaCacheNumClearRegion metric */
481  public Counter getMetaCacheNumClearRegion() {
482    return metaCacheNumClearRegion;
483  }
484
485  /** hedgedReadOps metric */
486  public Counter getHedgedReadOps() {
487    return hedgedReadOps;
488  }
489
490  /** hedgedReadWin metric */
491  public Counter getHedgedReadWin() {
492    return hedgedReadWin;
493  }
494
495  /** numActionsPerServerHist metric */
496  public Histogram getNumActionsPerServerHist() {
497    return numActionsPerServerHist;
498  }
499
500  /** rpcCounters metric */
501  public ConcurrentMap<String, Counter> getRpcCounters() {
502    return rpcCounters;
503  }
504
505  /** getTracker metric */
506  public CallTracker getGetTracker() {
507    return getTracker;
508  }
509
510  /** scanTracker metric */
511  public CallTracker getScanTracker() {
512    return scanTracker;
513  }
514
515  /** multiTracker metric */
516  public CallTracker getMultiTracker() {
517    return multiTracker;
518  }
519
520  /** appendTracker metric */
521  public CallTracker getAppendTracker() {
522    return appendTracker;
523  }
524
525  /** deleteTracker metric */
526  public CallTracker getDeleteTracker() {
527    return deleteTracker;
528  }
529
530  /** incrementTracker metric */
531  public CallTracker getIncrementTracker() {
532    return incrementTracker;
533  }
534
535  /** putTracker metric */
536  public CallTracker getPutTracker() {
537    return putTracker;
538  }
539
540  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
541  public static CallStats newCallStats() {
542    // TODO: instance pool to reduce GC?
543    return new CallStats();
544  }
545
546  /** Increment the number of meta cache hits. */
547  public void incrMetaCacheHit() {
548    metaCacheHits.inc();
549  }
550
551  /** Increment the number of meta cache misses. */
552  public void incrMetaCacheMiss() {
553    metaCacheMisses.inc();
554  }
555
556  /** Increment the number of meta cache drops requested for entire RegionServer. */
557  public void incrMetaCacheNumClearServer() {
558    metaCacheNumClearServer.inc();
559  }
560
561  /** Increment the number of meta cache drops requested for individual region. */
562  public void incrMetaCacheNumClearRegion() {
563    metaCacheNumClearRegion.inc();
564  }
565
566  /** Increment the number of meta cache drops requested for individual region. */
567  public void incrMetaCacheNumClearRegion(int count) {
568    metaCacheNumClearRegion.inc(count);
569  }
570
571  /** Increment the number of hedged read that have occurred. */
572  public void incrHedgedReadOps() {
573    hedgedReadOps.inc();
574  }
575
576  /** Increment the number of hedged read returned faster than the original read. */
577  public void incrHedgedReadWin() {
578    hedgedReadWin.inc();
579  }
580
581  /** Increment the number of normal runner counts. */
582  public void incrNormalRunners() {
583    this.runnerStats.incrNormalRunners();
584  }
585
586  /** Increment the number of delay runner counts and update delay interval of delay runner. */
587  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
588    this.runnerStats.incrDelayRunners();
589    this.runnerStats.updateDelayInterval(interval);
590  }
591
592  /** Update the overloaded backoff time **/
593  public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
594    overloadedBackoffTimer.update(time, timeUnit);
595  }
596
597  /** Return the connection count of the metrics within a scope */
598  public long getConnectionCount() {
599    return connectionCount.getCount();
600  }
601
602  /** Increment the connection count of the metrics within a scope */
603  private void incrConnectionCount() {
604    connectionCount.inc();
605  }
606
607  /** Decrement the connection count of the metrics within a scope */
608  private void decrConnectionCount() {
609    connectionCount.dec();
610  }
611
612  /** Add thread pools of additional connections to the metrics */
613  private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
614    Supplier<ThreadPoolExecutor> metaPool) {
615    batchPools.add(batchPool);
616    metaPools.add(metaPool);
617  }
618
619  /**
620   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
621   */
622  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
623    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
624  }
625
626  /** Update call stats for non-critical-path methods */
627  private void updateRpcGeneric(String methodName, CallStats stats) {
628    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(),
629      TimeUnit.MILLISECONDS);
630    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
631      .update(stats.getRequestSizeBytes());
632    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
633      .update(stats.getResponseSizeBytes());
634  }
635
636  private void shutdown() {
637    this.reporter.stop();
638  }
639
640  /** Report RPC context to metrics system. */
641  public void updateRpc(MethodDescriptor method, Message param, CallStats stats, boolean failed) {
642    int callsPerServer = stats.getConcurrentCallsPerServer();
643    if (callsPerServer > 0) {
644      concurrentCallsPerServerHist.update(callsPerServer);
645    }
646    // Update the counter that tracks RPCs by type.
647    final String methodName = method.getService().getName() + "_" + method.getName();
648    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
649    if (failed) {
650      getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc();
651    }
652    // this implementation is tied directly to protobuf implementation details. would be better
653    // if we could dispatch based on something static, ie, request Message type.
654    if (method.getService() == ClientService.getDescriptor()) {
655      switch (method.getIndex()) {
656        case 0:
657          assert "Get".equals(method.getName());
658          getTracker.updateRpc(stats);
659          return;
660        case 1:
661          assert "Mutate".equals(method.getName());
662          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
663          switch (mutationType) {
664            case APPEND:
665              appendTracker.updateRpc(stats);
666              return;
667            case DELETE:
668              deleteTracker.updateRpc(stats);
669              return;
670            case INCREMENT:
671              incrementTracker.updateRpc(stats);
672              return;
673            case PUT:
674              putTracker.updateRpc(stats);
675              return;
676            default:
677              throw new RuntimeException("Unrecognized mutation type " + mutationType);
678          }
679        case 2:
680          assert "Scan".equals(method.getName());
681          scanTracker.updateRpc(stats);
682          return;
683        case 3:
684          assert "BulkLoadHFile".equals(method.getName());
685          // use generic implementation
686          break;
687        case 4:
688          assert "PrepareBulkLoad".equals(method.getName());
689          // use generic implementation
690          break;
691        case 5:
692          assert "CleanupBulkLoad".equals(method.getName());
693          // use generic implementation
694          break;
695        case 6:
696          assert "ExecService".equals(method.getName());
697          // use generic implementation
698          break;
699        case 7:
700          assert "ExecRegionServerService".equals(method.getName());
701          // use generic implementation
702          break;
703        case 8:
704          assert "Multi".equals(method.getName());
705          numActionsPerServerHist.update(stats.getNumActionsPerServer());
706          multiTracker.updateRpc(stats);
707          return;
708        default:
709          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
710      }
711    }
712    // Fallback to dynamic registry lookup for DDL methods.
713    updateRpcGeneric(methodName, stats);
714  }
715
716  public void incrCacheDroppingExceptions(Object exception) {
717    getMetric(
718      CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
719      cacheDroppingExceptions, counterFactory).inc();
720  }
721
722  public void incrNsLookups() {
723    this.nsLookups.inc();
724  }
725
726  public void incrNsLookupsFailed() {
727    this.nsLookupsFailed.inc();
728  }
729}