001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Supplier;
037import org.apache.commons.lang3.StringUtils;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.ipc.RemoteException;
043import org.apache.yetus.audience.InterfaceAudience;
044
045import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
046import org.apache.hbase.thirdparty.com.google.protobuf.Message;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
051
052/**
053 * This class is for maintaining the various connection statistics and publishing them through the
054 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
055 * as to not conflict with other uses of Yammer Metrics within the client application. Calling
056 * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and
057 * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to
058 * terminate the thread pools they allocate. The metrics reporter will be shutdown
059 * {@link #shutdown()} when all connections within this metrics instances are closed.
060 */
061@InterfaceAudience.Private
062public final class MetricsConnection implements StatisticTrackable {
063
064  private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
065    new ConcurrentHashMap<>();
066
067  static MetricsConnection getMetricsConnection(final Configuration conf, final String scope,
068    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
069    return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
070      if (metricsConnection == null) {
071        MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool);
072        newMetricsConn.incrConnectionCount();
073        return newMetricsConn;
074      } else {
075        metricsConnection.addThreadPools(batchPool, metaPool);
076        metricsConnection.incrConnectionCount();
077        return metricsConnection;
078      }
079    });
080  }
081
082  static void deleteMetricsConnection(final String scope) {
083    METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
084      metricsConnection.decrConnectionCount();
085      if (metricsConnection.getConnectionCount() == 0) {
086        metricsConnection.shutdown();
087        return null;
088      }
089      return metricsConnection;
090    });
091  }
092
093  /** Set this key to {@code true} to enable metrics collection of client requests. */
094  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
095
096  /** Set this key to {@code true} to enable table metrics collection of client requests. */
097  public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
098    "hbase.client.table.metrics.enable";
099
100  /**
101   * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
102   * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
103   * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be
104   * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users
105   * may set this key to give a more contextual name for this scope. For example, one might want to
106   * differentiate a read connection from a write connection by setting the scopes to "foo-read" and
107   * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics.
108   * Care should be taken to avoid using the same scope for multiple Connections, otherwise the
109   * metrics may aggregate in unforeseen ways.
110   */
111  public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
112
113  /**
114   * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or
115   * by generating a default from the passed clusterId and connectionObj's hashCode.
116   * @param conf          configuration for the connection
117   * @param clusterId     clusterId for the connection
118   * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this
119   *                      MetricsConnection.
120   */
121  static String getScope(Configuration conf, String clusterId, Object connectionObj) {
122    return conf.get(METRICS_SCOPE_KEY,
123      clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
124  }
125
126  private static final String CNT_BASE = "rpcCount_";
127  private static final String FAILURE_CNT_BASE = "rpcFailureCount_";
128  private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions";
129  private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_";
130  private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_";
131  private static final String DRTN_BASE = "rpcCallDurationMs_";
132  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
133  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
134  private static final String MEMLOAD_BASE = "memstoreLoad_";
135  private static final String HEAP_BASE = "heapOccupancy_";
136  private static final String CACHE_BASE = "cacheDroppingExceptions_";
137  private static final String UNKNOWN_EXCEPTION = "UnknownException";
138  private static final String NS_LOOKUPS = "nsLookups";
139  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
140  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
141
142  /** A container class for collecting details about the RPC call as it percolates. */
143  public static class CallStats {
144    private long requestSizeBytes = 0;
145    private long responseSizeBytes = 0;
146    private long startTime = 0;
147    private long callTimeMs = 0;
148    private int concurrentCallsPerServer = 0;
149    private int numActionsPerServer = 0;
150
151    public long getRequestSizeBytes() {
152      return requestSizeBytes;
153    }
154
155    public void setRequestSizeBytes(long requestSizeBytes) {
156      this.requestSizeBytes = requestSizeBytes;
157    }
158
159    public long getResponseSizeBytes() {
160      return responseSizeBytes;
161    }
162
163    public void setResponseSizeBytes(long responseSizeBytes) {
164      this.responseSizeBytes = responseSizeBytes;
165    }
166
167    public long getStartTime() {
168      return startTime;
169    }
170
171    public void setStartTime(long startTime) {
172      this.startTime = startTime;
173    }
174
175    public long getCallTimeMs() {
176      return callTimeMs;
177    }
178
179    public void setCallTimeMs(long callTimeMs) {
180      this.callTimeMs = callTimeMs;
181    }
182
183    public int getConcurrentCallsPerServer() {
184      return concurrentCallsPerServer;
185    }
186
187    public void setConcurrentCallsPerServer(int callsPerServer) {
188      this.concurrentCallsPerServer = callsPerServer;
189    }
190
191    public int getNumActionsPerServer() {
192      return numActionsPerServer;
193    }
194
195    public void setNumActionsPerServer(int numActionsPerServer) {
196      this.numActionsPerServer = numActionsPerServer;
197    }
198  }
199
200  protected static final class CallTracker {
201    private final String name;
202    final Timer callTimer;
203    final Histogram reqHist;
204    final Histogram respHist;
205
206    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
207      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
208      if (subName != null) {
209        sb.append("(").append(subName).append(")");
210      }
211      this.name = sb.toString();
212      this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope));
213      this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope));
214      this.respHist =
215        registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope));
216    }
217
218    private CallTracker(MetricRegistry registry, String name, String scope) {
219      this(registry, name, null, scope);
220    }
221
222    public void updateRpc(CallStats stats) {
223      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
224      this.reqHist.update(stats.getRequestSizeBytes());
225      this.respHist.update(stats.getResponseSizeBytes());
226    }
227
228    @Override
229    public String toString() {
230      return "CallTracker:" + name;
231    }
232  }
233
234  protected static class RegionStats {
235    final String name;
236    final Histogram memstoreLoadHist;
237    final Histogram heapOccupancyHist;
238
239    public RegionStats(MetricRegistry registry, String name) {
240      this.name = name;
241      this.memstoreLoadHist =
242        registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name));
243      this.heapOccupancyHist =
244        registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name));
245    }
246
247    public void update(RegionLoadStats regionStatistics) {
248      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
249      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
250    }
251  }
252
253  protected static class RunnerStats {
254    final Counter normalRunners;
255    final Counter delayRunners;
256    final Histogram delayIntevalHist;
257
258    public RunnerStats(MetricRegistry registry) {
259      this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
260      this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
261      this.delayIntevalHist =
262        registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
263    }
264
265    public void incrNormalRunners() {
266      this.normalRunners.inc();
267    }
268
269    public void incrDelayRunners() {
270      this.delayRunners.inc();
271    }
272
273    public void updateDelayInterval(long interval) {
274      this.delayIntevalHist.update(interval);
275    }
276  }
277
278  private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
279    new ConcurrentHashMap<>();
280
281  public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
282    if (!(r instanceof Result)) {
283      return;
284    }
285    Result result = (Result) r;
286    RegionLoadStats stats = result.getStats();
287    if (stats == null) {
288      return;
289    }
290    updateRegionStats(serverName, regionName, stats);
291  }
292
293  @Override
294  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
295    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
296    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
297      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
298    RegionStats regionStats =
299      computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
300    regionStats.update(stats);
301  }
302
303  /** A lambda for dispatching to the appropriate metric factory method */
304  private static interface NewMetric<T> {
305    T newMetric(Class<?> clazz, String name, String scope);
306  }
307
308  /** Anticipated number of metric entries */
309  private static final int CAPACITY = 50;
310  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
311  private static final float LOAD_FACTOR = 0.75f;
312  /**
313   * Anticipated number of concurrent accessor threads
314   */
315  private static final int CONCURRENCY_LEVEL = 256;
316
317  private final MetricRegistry registry;
318  private final JmxReporter reporter;
319  private final String scope;
320  private final boolean tableMetricsEnabled;
321
322  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
323    @Override
324    public Timer newMetric(Class<?> clazz, String name, String scope) {
325      return registry.timer(name(clazz, name, scope));
326    }
327  };
328
329  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
330    @Override
331    public Histogram newMetric(Class<?> clazz, String name, String scope) {
332      return registry.histogram(name(clazz, name, scope));
333    }
334  };
335
336  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
337    @Override
338    public Counter newMetric(Class<?> clazz, String name, String scope) {
339      return registry.counter(name(clazz, name, scope));
340    }
341  };
342
343  // List of thread pool per connection of the metrics.
344  private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
345  private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
346
347  // static metrics
348
349  private final Counter connectionCount;
350  private final Counter metaCacheHits;
351  private final Counter metaCacheMisses;
352  private final CallTracker getTracker;
353  private final CallTracker scanTracker;
354  private final CallTracker appendTracker;
355  private final CallTracker deleteTracker;
356  private final CallTracker incrementTracker;
357  private final CallTracker putTracker;
358  private final CallTracker multiTracker;
359  private final RunnerStats runnerStats;
360  private final Counter metaCacheNumClearServer;
361  private final Counter metaCacheNumClearRegion;
362  private final Counter hedgedReadOps;
363  private final Counter hedgedReadWin;
364  private final Histogram concurrentCallsPerServerHist;
365  private final Histogram numActionsPerServerHist;
366  private final Counter nsLookups;
367  private final Counter nsLookupsFailed;
368  private final Timer overloadedBackoffTimer;
369
370  // dynamic metrics
371
372  // These maps are used to cache references to the metric instances that are managed by the
373  // registry. I don't think their use perfectly removes redundant allocations, but it's
374  // a big improvement over calling registry.newMetric each time.
375  private final ConcurrentMap<String, Timer> rpcTimers =
376    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
377  private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
378    CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
379  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
380    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
381  private final ConcurrentMap<String, Counter> rpcCounters =
382    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
383
384  private MetricsConnection(Configuration conf, String scope,
385    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
386    this.scope = scope;
387    this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false);
388    addThreadPools(batchPool, metaPool);
389    this.registry = new MetricRegistry();
390    this.registry.register(getExecutorPoolName(), new RatioGauge() {
391      @Override
392      protected Ratio getRatio() {
393        int numerator = 0;
394        int denominator = 0;
395        for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
396          ThreadPoolExecutor pool = poolSupplier.get();
397          if (pool != null) {
398            int activeCount = pool.getActiveCount();
399            int maxPoolSize = pool.getMaximumPoolSize();
400            /* The max thread usage ratio among batch pools of all connections */
401            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
402              numerator = activeCount;
403              denominator = maxPoolSize;
404            }
405          }
406        }
407        return Ratio.of(numerator, denominator);
408      }
409    });
410    this.registry.register(getMetaPoolName(), new RatioGauge() {
411      @Override
412      protected Ratio getRatio() {
413        int numerator = 0;
414        int denominator = 0;
415        for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
416          ThreadPoolExecutor pool = poolSupplier.get();
417          if (pool != null) {
418            int activeCount = pool.getActiveCount();
419            int maxPoolSize = pool.getMaximumPoolSize();
420            /* The max thread usage ratio among meta lookup pools of all connections */
421            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
422              numerator = activeCount;
423              denominator = maxPoolSize;
424            }
425          }
426        }
427        return Ratio.of(numerator, denominator);
428      }
429    });
430    this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
431    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
432    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
433    this.metaCacheNumClearServer =
434      registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope));
435    this.metaCacheNumClearRegion =
436      registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope));
437    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
438    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
439    this.getTracker = new CallTracker(this.registry, "Get", scope);
440    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
441    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
442    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
443    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
444    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
445    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
446    this.runnerStats = new RunnerStats(this.registry);
447    this.concurrentCallsPerServerHist =
448      registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope));
449    this.numActionsPerServerHist =
450      registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope));
451    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
452    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
453    this.overloadedBackoffTimer =
454      registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
455
456    this.reporter = JmxReporter.forRegistry(this.registry).build();
457    this.reporter.start();
458  }
459
460  final String getExecutorPoolName() {
461    return name(getClass(), "executorPoolActiveThreads", scope);
462  }
463
464  final String getMetaPoolName() {
465    return name(getClass(), "metaPoolActiveThreads", scope);
466  }
467
468  MetricRegistry getMetricRegistry() {
469    return registry;
470  }
471
472  /** scope of the metrics object */
473  public String getMetricScope() {
474    return scope;
475  }
476
477  /** serverStats metric */
478  public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
479    return serverStats;
480  }
481
482  /** runnerStats metric */
483  public RunnerStats getRunnerStats() {
484    return runnerStats;
485  }
486
487  /** metaCacheNumClearServer metric */
488  public Counter getMetaCacheNumClearServer() {
489    return metaCacheNumClearServer;
490  }
491
492  /** metaCacheNumClearRegion metric */
493  public Counter getMetaCacheNumClearRegion() {
494    return metaCacheNumClearRegion;
495  }
496
497  /** hedgedReadOps metric */
498  public Counter getHedgedReadOps() {
499    return hedgedReadOps;
500  }
501
502  /** hedgedReadWin metric */
503  public Counter getHedgedReadWin() {
504    return hedgedReadWin;
505  }
506
507  /** numActionsPerServerHist metric */
508  public Histogram getNumActionsPerServerHist() {
509    return numActionsPerServerHist;
510  }
511
512  /** rpcCounters metric */
513  public ConcurrentMap<String, Counter> getRpcCounters() {
514    return rpcCounters;
515  }
516
517  /** rpcTimers metric */
518  public ConcurrentMap<String, Timer> getRpcTimers() {
519    return rpcTimers;
520  }
521
522  /** rpcHistograms metric */
523  public ConcurrentMap<String, Histogram> getRpcHistograms() {
524    return rpcHistograms;
525  }
526
527  /** getTracker metric */
528  public CallTracker getGetTracker() {
529    return getTracker;
530  }
531
532  /** scanTracker metric */
533  public CallTracker getScanTracker() {
534    return scanTracker;
535  }
536
537  /** multiTracker metric */
538  public CallTracker getMultiTracker() {
539    return multiTracker;
540  }
541
542  /** appendTracker metric */
543  public CallTracker getAppendTracker() {
544    return appendTracker;
545  }
546
547  /** deleteTracker metric */
548  public CallTracker getDeleteTracker() {
549    return deleteTracker;
550  }
551
552  /** incrementTracker metric */
553  public CallTracker getIncrementTracker() {
554    return incrementTracker;
555  }
556
557  /** putTracker metric */
558  public CallTracker getPutTracker() {
559    return putTracker;
560  }
561
562  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
563  public static CallStats newCallStats() {
564    // TODO: instance pool to reduce GC?
565    return new CallStats();
566  }
567
568  /** Increment the number of meta cache hits. */
569  public void incrMetaCacheHit() {
570    metaCacheHits.inc();
571  }
572
573  /** Increment the number of meta cache misses. */
574  public void incrMetaCacheMiss() {
575    metaCacheMisses.inc();
576  }
577
578  public long getMetaCacheMisses() {
579    return metaCacheMisses.getCount();
580  }
581
582  /** Increment the number of meta cache drops requested for entire RegionServer. */
583  public void incrMetaCacheNumClearServer() {
584    metaCacheNumClearServer.inc();
585  }
586
587  /** Increment the number of meta cache drops requested for individual region. */
588  public void incrMetaCacheNumClearRegion() {
589    metaCacheNumClearRegion.inc();
590  }
591
592  /** Increment the number of meta cache drops requested for individual region. */
593  public void incrMetaCacheNumClearRegion(int count) {
594    metaCacheNumClearRegion.inc(count);
595  }
596
597  /** Increment the number of hedged read that have occurred. */
598  public void incrHedgedReadOps() {
599    hedgedReadOps.inc();
600  }
601
602  /** Increment the number of hedged read returned faster than the original read. */
603  public void incrHedgedReadWin() {
604    hedgedReadWin.inc();
605  }
606
607  /** Increment the number of normal runner counts. */
608  public void incrNormalRunners() {
609    this.runnerStats.incrNormalRunners();
610  }
611
612  /** Increment the number of delay runner counts and update delay interval of delay runner. */
613  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
614    this.runnerStats.incrDelayRunners();
615    this.runnerStats.updateDelayInterval(interval);
616  }
617
618  /** Update the overloaded backoff time **/
619  public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
620    overloadedBackoffTimer.update(time, timeUnit);
621  }
622
623  /** Return the connection count of the metrics within a scope */
624  public long getConnectionCount() {
625    return connectionCount.getCount();
626  }
627
628  /** Increment the connection count of the metrics within a scope */
629  private void incrConnectionCount() {
630    connectionCount.inc();
631  }
632
633  /** Decrement the connection count of the metrics within a scope */
634  private void decrConnectionCount() {
635    connectionCount.dec();
636  }
637
638  /** Add thread pools of additional connections to the metrics */
639  private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
640    Supplier<ThreadPoolExecutor> metaPool) {
641    batchPools.add(batchPool);
642    metaPools.add(metaPool);
643  }
644
645  /**
646   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
647   */
648  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
649    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
650  }
651
652  /** Update call stats for non-critical-path methods */
653  private void updateRpcGeneric(String methodName, CallStats stats) {
654    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(),
655      TimeUnit.MILLISECONDS);
656    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
657      .update(stats.getRequestSizeBytes());
658    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
659      .update(stats.getResponseSizeBytes());
660  }
661
662  private void shutdown() {
663    this.reporter.stop();
664  }
665
666  /** Report RPC context to metrics system. */
667  public void updateRpc(MethodDescriptor method, TableName tableName, Message param,
668    CallStats stats, Throwable e) {
669    int callsPerServer = stats.getConcurrentCallsPerServer();
670    if (callsPerServer > 0) {
671      concurrentCallsPerServerHist.update(callsPerServer);
672    }
673    // Update the counter that tracks RPCs by type.
674    StringBuilder methodName = new StringBuilder();
675    methodName.append(method.getService().getName()).append("_").append(method.getName());
676    // Distinguish mutate types.
677    if ("Mutate".equals(method.getName())) {
678      final MutationType type = ((MutateRequest) param).getMutation().getMutateType();
679      switch (type) {
680        case APPEND:
681          methodName.append("(Append)");
682          break;
683        case DELETE:
684          methodName.append("(Delete)");
685          break;
686        case INCREMENT:
687          methodName.append("(Increment)");
688          break;
689        case PUT:
690          methodName.append("(Put)");
691          break;
692        default:
693          methodName.append("(Unknown)");
694      }
695    }
696    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
697    if (e != null) {
698      getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc();
699      getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc();
700      if (e instanceof RemoteException) {
701        String fullClassName = ((RemoteException) e).getClassName();
702        String simpleClassName = (fullClassName != null)
703          ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1)
704          : "unknown";
705        getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc();
706      } else {
707        getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters,
708          counterFactory).inc();
709      }
710    }
711    // this implementation is tied directly to protobuf implementation details. would be better
712    // if we could dispatch based on something static, ie, request Message type.
713    if (method.getService() == ClientService.getDescriptor()) {
714      switch (method.getIndex()) {
715        case 0:
716          assert "Get".equals(method.getName());
717          getTracker.updateRpc(stats);
718          updateTableMetric(methodName.toString(), tableName, stats, e);
719          return;
720        case 1:
721          assert "Mutate".equals(method.getName());
722          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
723          switch (mutationType) {
724            case APPEND:
725              appendTracker.updateRpc(stats);
726              break;
727            case DELETE:
728              deleteTracker.updateRpc(stats);
729              break;
730            case INCREMENT:
731              incrementTracker.updateRpc(stats);
732              break;
733            case PUT:
734              putTracker.updateRpc(stats);
735              break;
736            default:
737              throw new RuntimeException("Unrecognized mutation type " + mutationType);
738          }
739          updateTableMetric(methodName.toString(), tableName, stats, e);
740          return;
741        case 2:
742          assert "Scan".equals(method.getName());
743          scanTracker.updateRpc(stats);
744          updateTableMetric(methodName.toString(), tableName, stats, e);
745          return;
746        case 3:
747          assert "BulkLoadHFile".equals(method.getName());
748          // use generic implementation
749          break;
750        case 4:
751          assert "PrepareBulkLoad".equals(method.getName());
752          // use generic implementation
753          break;
754        case 5:
755          assert "CleanupBulkLoad".equals(method.getName());
756          // use generic implementation
757          break;
758        case 6:
759          assert "ExecService".equals(method.getName());
760          // use generic implementation
761          break;
762        case 7:
763          assert "ExecRegionServerService".equals(method.getName());
764          // use generic implementation
765          break;
766        case 8:
767          assert "Multi".equals(method.getName());
768          numActionsPerServerHist.update(stats.getNumActionsPerServer());
769          multiTracker.updateRpc(stats);
770          updateTableMetric(methodName.toString(), tableName, stats, e);
771          return;
772        default:
773          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
774      }
775    }
776    // Fallback to dynamic registry lookup for DDL methods.
777    updateRpcGeneric(methodName.toString(), stats);
778  }
779
780  /** Report table rpc context to metrics system. */
781  private void updateTableMetric(String methodName, TableName tableName, CallStats stats,
782    Throwable e) {
783    if (tableMetricsEnabled) {
784      if (methodName != null) {
785        String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString())
786          ? tableName.getNameAsString()
787          : "unknown";
788        String metricKey = methodName + "_" + table;
789        // update table rpc context to metrics system,
790        // includes rpc call duration, rpc call request/response size(bytes).
791        updateRpcGeneric(metricKey, stats);
792        if (e != null) {
793          // rpc failure call counter with table name.
794          getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc();
795        }
796      }
797    }
798  }
799
800  public void incrCacheDroppingExceptions(Object exception) {
801    getMetric(
802      CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
803      cacheDroppingExceptions, counterFactory).inc();
804  }
805
806  public void incrNsLookups() {
807    this.nsLookups.inc();
808  }
809
810  public void incrNsLookupsFailed() {
811    this.nsLookupsFailed.inc();
812  }
813}