001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Supplier;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
044import org.apache.hbase.thirdparty.com.google.protobuf.Message;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
049
050/**
051 * This class is for maintaining the various connection statistics and publishing them through the
052 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
053 * as to not conflict with other uses of Yammer Metrics within the client application. Calling
054 * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
055 * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
056 * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
057 * all connections within this metrics instances are closed.
058 */
059@InterfaceAudience.Private
060public final class MetricsConnection implements StatisticTrackable {
061
062  private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
063    new ConcurrentHashMap<>();
064
065  static MetricsConnection getMetricsConnection(final String scope,
066    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
067    return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
068      if (metricsConnection == null) {
069        MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
070        newMetricsConn.incrConnectionCount();
071        return newMetricsConn;
072      } else {
073        metricsConnection.addThreadPools(batchPool, metaPool);
074        metricsConnection.incrConnectionCount();
075        return metricsConnection;
076      }
077    });
078  }
079
080  static void deleteMetricsConnection(final String scope) {
081    METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
082      metricsConnection.decrConnectionCount();
083      if (metricsConnection.getConnectionCount() == 0) {
084        metricsConnection.shutdown();
085        return null;
086      }
087      return metricsConnection;
088    });
089  }
090
091  /** Set this key to {@code true} to enable metrics collection of client requests. */
092  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
093
094  /**
095   * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
096   * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
097   * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be
098   * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users
099   * may set this key to give a more contextual name for this scope. For example, one might want to
100   * differentiate a read connection from a write connection by setting the scopes to "foo-read" and
101   * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics.
102   * Care should be taken to avoid using the same scope for multiple Connections, otherwise the
103   * metrics may aggregate in unforeseen ways.
104   */
105  public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
106
107  /**
108   * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or
109   * by generating a default from the passed clusterId and connectionObj's hashCode.
110   * @param conf          configuration for the connection
111   * @param clusterId     clusterId for the connection
112   * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this
113   *                      MetricsConnection.
114   */
115  static String getScope(Configuration conf, String clusterId, Object connectionObj) {
116    return conf.get(METRICS_SCOPE_KEY,
117      clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
118  }
119
120  private static final String CNT_BASE = "rpcCount_";
121  private static final String FAILURE_CNT_BASE = "rpcFailureCount_";
122  private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions";
123  private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_";
124  private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_";
125  private static final String DRTN_BASE = "rpcCallDurationMs_";
126  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
127  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
128  private static final String MEMLOAD_BASE = "memstoreLoad_";
129  private static final String HEAP_BASE = "heapOccupancy_";
130  private static final String CACHE_BASE = "cacheDroppingExceptions_";
131  private static final String UNKNOWN_EXCEPTION = "UnknownException";
132  private static final String NS_LOOKUPS = "nsLookups";
133  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
134  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
135
136  /** A container class for collecting details about the RPC call as it percolates. */
137  public static class CallStats {
138    private long requestSizeBytes = 0;
139    private long responseSizeBytes = 0;
140    private long startTime = 0;
141    private long callTimeMs = 0;
142    private int concurrentCallsPerServer = 0;
143    private int numActionsPerServer = 0;
144
145    public long getRequestSizeBytes() {
146      return requestSizeBytes;
147    }
148
149    public void setRequestSizeBytes(long requestSizeBytes) {
150      this.requestSizeBytes = requestSizeBytes;
151    }
152
153    public long getResponseSizeBytes() {
154      return responseSizeBytes;
155    }
156
157    public void setResponseSizeBytes(long responseSizeBytes) {
158      this.responseSizeBytes = responseSizeBytes;
159    }
160
161    public long getStartTime() {
162      return startTime;
163    }
164
165    public void setStartTime(long startTime) {
166      this.startTime = startTime;
167    }
168
169    public long getCallTimeMs() {
170      return callTimeMs;
171    }
172
173    public void setCallTimeMs(long callTimeMs) {
174      this.callTimeMs = callTimeMs;
175    }
176
177    public int getConcurrentCallsPerServer() {
178      return concurrentCallsPerServer;
179    }
180
181    public void setConcurrentCallsPerServer(int callsPerServer) {
182      this.concurrentCallsPerServer = callsPerServer;
183    }
184
185    public int getNumActionsPerServer() {
186      return numActionsPerServer;
187    }
188
189    public void setNumActionsPerServer(int numActionsPerServer) {
190      this.numActionsPerServer = numActionsPerServer;
191    }
192  }
193
194  protected static final class CallTracker {
195    private final String name;
196    final Timer callTimer;
197    final Histogram reqHist;
198    final Histogram respHist;
199
200    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
201      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
202      if (subName != null) {
203        sb.append("(").append(subName).append(")");
204      }
205      this.name = sb.toString();
206      this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope));
207      this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope));
208      this.respHist =
209        registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope));
210    }
211
212    private CallTracker(MetricRegistry registry, String name, String scope) {
213      this(registry, name, null, scope);
214    }
215
216    public void updateRpc(CallStats stats) {
217      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
218      this.reqHist.update(stats.getRequestSizeBytes());
219      this.respHist.update(stats.getResponseSizeBytes());
220    }
221
222    @Override
223    public String toString() {
224      return "CallTracker:" + name;
225    }
226  }
227
228  protected static class RegionStats {
229    final String name;
230    final Histogram memstoreLoadHist;
231    final Histogram heapOccupancyHist;
232
233    public RegionStats(MetricRegistry registry, String name) {
234      this.name = name;
235      this.memstoreLoadHist =
236        registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name));
237      this.heapOccupancyHist =
238        registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name));
239    }
240
241    public void update(RegionLoadStats regionStatistics) {
242      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
243      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
244    }
245  }
246
247  protected static class RunnerStats {
248    final Counter normalRunners;
249    final Counter delayRunners;
250    final Histogram delayIntevalHist;
251
252    public RunnerStats(MetricRegistry registry) {
253      this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
254      this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
255      this.delayIntevalHist =
256        registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
257    }
258
259    public void incrNormalRunners() {
260      this.normalRunners.inc();
261    }
262
263    public void incrDelayRunners() {
264      this.delayRunners.inc();
265    }
266
267    public void updateDelayInterval(long interval) {
268      this.delayIntevalHist.update(interval);
269    }
270  }
271
272  private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
273    new ConcurrentHashMap<>();
274
275  public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
276    if (!(r instanceof Result)) {
277      return;
278    }
279    Result result = (Result) r;
280    RegionLoadStats stats = result.getStats();
281    if (stats == null) {
282      return;
283    }
284    updateRegionStats(serverName, regionName, stats);
285  }
286
287  @Override
288  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
289    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
290    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
291      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
292    RegionStats regionStats =
293      computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
294    regionStats.update(stats);
295  }
296
297  /** A lambda for dispatching to the appropriate metric factory method */
298  private static interface NewMetric<T> {
299    T newMetric(Class<?> clazz, String name, String scope);
300  }
301
302  /** Anticipated number of metric entries */
303  private static final int CAPACITY = 50;
304  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
305  private static final float LOAD_FACTOR = 0.75f;
306  /**
307   * Anticipated number of concurrent accessor threads
308   */
309  private static final int CONCURRENCY_LEVEL = 256;
310
311  private final MetricRegistry registry;
312  private final JmxReporter reporter;
313  private final String scope;
314
315  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
316    @Override
317    public Timer newMetric(Class<?> clazz, String name, String scope) {
318      return registry.timer(name(clazz, name, scope));
319    }
320  };
321
322  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
323    @Override
324    public Histogram newMetric(Class<?> clazz, String name, String scope) {
325      return registry.histogram(name(clazz, name, scope));
326    }
327  };
328
329  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
330    @Override
331    public Counter newMetric(Class<?> clazz, String name, String scope) {
332      return registry.counter(name(clazz, name, scope));
333    }
334  };
335
336  // List of thread pool per connection of the metrics.
337  private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
338  private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
339
340  // static metrics
341
342  private final Counter connectionCount;
343  private final Counter metaCacheHits;
344  private final Counter metaCacheMisses;
345  private final CallTracker getTracker;
346  private final CallTracker scanTracker;
347  private final CallTracker appendTracker;
348  private final CallTracker deleteTracker;
349  private final CallTracker incrementTracker;
350  private final CallTracker putTracker;
351  private final CallTracker multiTracker;
352  private final RunnerStats runnerStats;
353  private final Counter metaCacheNumClearServer;
354  private final Counter metaCacheNumClearRegion;
355  private final Counter hedgedReadOps;
356  private final Counter hedgedReadWin;
357  private final Histogram concurrentCallsPerServerHist;
358  private final Histogram numActionsPerServerHist;
359  private final Counter nsLookups;
360  private final Counter nsLookupsFailed;
361  private final Timer overloadedBackoffTimer;
362
363  // dynamic metrics
364
365  // These maps are used to cache references to the metric instances that are managed by the
366  // registry. I don't think their use perfectly removes redundant allocations, but it's
367  // a big improvement over calling registry.newMetric each time.
368  private final ConcurrentMap<String, Timer> rpcTimers =
369    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
370  private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
371    CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
372  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
373    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
374  private final ConcurrentMap<String, Counter> rpcCounters =
375    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
376
377  private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
378    Supplier<ThreadPoolExecutor> metaPool) {
379    this.scope = scope;
380    addThreadPools(batchPool, metaPool);
381    this.registry = new MetricRegistry();
382    this.registry.register(getExecutorPoolName(), new RatioGauge() {
383      @Override
384      protected Ratio getRatio() {
385        int numerator = 0;
386        int denominator = 0;
387        for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
388          ThreadPoolExecutor pool = poolSupplier.get();
389          if (pool != null) {
390            int activeCount = pool.getActiveCount();
391            int maxPoolSize = pool.getMaximumPoolSize();
392            /* The max thread usage ratio among batch pools of all connections */
393            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
394              numerator = activeCount;
395              denominator = maxPoolSize;
396            }
397          }
398        }
399        return Ratio.of(numerator, denominator);
400      }
401    });
402    this.registry.register(getMetaPoolName(), new RatioGauge() {
403      @Override
404      protected Ratio getRatio() {
405        int numerator = 0;
406        int denominator = 0;
407        for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
408          ThreadPoolExecutor pool = poolSupplier.get();
409          if (pool != null) {
410            int activeCount = pool.getActiveCount();
411            int maxPoolSize = pool.getMaximumPoolSize();
412            /* The max thread usage ratio among meta lookup pools of all connections */
413            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
414              numerator = activeCount;
415              denominator = maxPoolSize;
416            }
417          }
418        }
419        return Ratio.of(numerator, denominator);
420      }
421    });
422    this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
423    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
424    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
425    this.metaCacheNumClearServer =
426      registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope));
427    this.metaCacheNumClearRegion =
428      registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope));
429    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
430    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
431    this.getTracker = new CallTracker(this.registry, "Get", scope);
432    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
433    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
434    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
435    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
436    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
437    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
438    this.runnerStats = new RunnerStats(this.registry);
439    this.concurrentCallsPerServerHist =
440      registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope));
441    this.numActionsPerServerHist =
442      registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope));
443    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
444    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
445    this.overloadedBackoffTimer =
446      registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
447
448    this.reporter = JmxReporter.forRegistry(this.registry).build();
449    this.reporter.start();
450  }
451
452  final String getExecutorPoolName() {
453    return name(getClass(), "executorPoolActiveThreads", scope);
454  }
455
456  final String getMetaPoolName() {
457    return name(getClass(), "metaPoolActiveThreads", scope);
458  }
459
460  MetricRegistry getMetricRegistry() {
461    return registry;
462  }
463
464  /** scope of the metrics object */
465  public String getMetricScope() {
466    return scope;
467  }
468
469  /** serverStats metric */
470  public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
471    return serverStats;
472  }
473
474  /** runnerStats metric */
475  public RunnerStats getRunnerStats() {
476    return runnerStats;
477  }
478
479  /** metaCacheNumClearServer metric */
480  public Counter getMetaCacheNumClearServer() {
481    return metaCacheNumClearServer;
482  }
483
484  /** metaCacheNumClearRegion metric */
485  public Counter getMetaCacheNumClearRegion() {
486    return metaCacheNumClearRegion;
487  }
488
489  /** hedgedReadOps metric */
490  public Counter getHedgedReadOps() {
491    return hedgedReadOps;
492  }
493
494  /** hedgedReadWin metric */
495  public Counter getHedgedReadWin() {
496    return hedgedReadWin;
497  }
498
499  /** numActionsPerServerHist metric */
500  public Histogram getNumActionsPerServerHist() {
501    return numActionsPerServerHist;
502  }
503
504  /** rpcCounters metric */
505  public ConcurrentMap<String, Counter> getRpcCounters() {
506    return rpcCounters;
507  }
508
509  /** getTracker metric */
510  public CallTracker getGetTracker() {
511    return getTracker;
512  }
513
514  /** scanTracker metric */
515  public CallTracker getScanTracker() {
516    return scanTracker;
517  }
518
519  /** multiTracker metric */
520  public CallTracker getMultiTracker() {
521    return multiTracker;
522  }
523
524  /** appendTracker metric */
525  public CallTracker getAppendTracker() {
526    return appendTracker;
527  }
528
529  /** deleteTracker metric */
530  public CallTracker getDeleteTracker() {
531    return deleteTracker;
532  }
533
534  /** incrementTracker metric */
535  public CallTracker getIncrementTracker() {
536    return incrementTracker;
537  }
538
539  /** putTracker metric */
540  public CallTracker getPutTracker() {
541    return putTracker;
542  }
543
544  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
545  public static CallStats newCallStats() {
546    // TODO: instance pool to reduce GC?
547    return new CallStats();
548  }
549
550  /** Increment the number of meta cache hits. */
551  public void incrMetaCacheHit() {
552    metaCacheHits.inc();
553  }
554
555  /** Increment the number of meta cache misses. */
556  public void incrMetaCacheMiss() {
557    metaCacheMisses.inc();
558  }
559
560  public long getMetaCacheMisses() {
561    return metaCacheMisses.getCount();
562  }
563
564  /** Increment the number of meta cache drops requested for entire RegionServer. */
565  public void incrMetaCacheNumClearServer() {
566    metaCacheNumClearServer.inc();
567  }
568
569  /** Increment the number of meta cache drops requested for individual region. */
570  public void incrMetaCacheNumClearRegion() {
571    metaCacheNumClearRegion.inc();
572  }
573
574  /** Increment the number of meta cache drops requested for individual region. */
575  public void incrMetaCacheNumClearRegion(int count) {
576    metaCacheNumClearRegion.inc(count);
577  }
578
579  /** Increment the number of hedged read that have occurred. */
580  public void incrHedgedReadOps() {
581    hedgedReadOps.inc();
582  }
583
584  /** Increment the number of hedged read returned faster than the original read. */
585  public void incrHedgedReadWin() {
586    hedgedReadWin.inc();
587  }
588
589  /** Increment the number of normal runner counts. */
590  public void incrNormalRunners() {
591    this.runnerStats.incrNormalRunners();
592  }
593
594  /** Increment the number of delay runner counts and update delay interval of delay runner. */
595  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
596    this.runnerStats.incrDelayRunners();
597    this.runnerStats.updateDelayInterval(interval);
598  }
599
600  /** Update the overloaded backoff time **/
601  public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
602    overloadedBackoffTimer.update(time, timeUnit);
603  }
604
605  /** Return the connection count of the metrics within a scope */
606  public long getConnectionCount() {
607    return connectionCount.getCount();
608  }
609
610  /** Increment the connection count of the metrics within a scope */
611  private void incrConnectionCount() {
612    connectionCount.inc();
613  }
614
615  /** Decrement the connection count of the metrics within a scope */
616  private void decrConnectionCount() {
617    connectionCount.dec();
618  }
619
620  /** Add thread pools of additional connections to the metrics */
621  private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
622    Supplier<ThreadPoolExecutor> metaPool) {
623    batchPools.add(batchPool);
624    metaPools.add(metaPool);
625  }
626
627  /**
628   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
629   */
630  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
631    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
632  }
633
634  /** Update call stats for non-critical-path methods */
635  private void updateRpcGeneric(String methodName, CallStats stats) {
636    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(),
637      TimeUnit.MILLISECONDS);
638    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
639      .update(stats.getRequestSizeBytes());
640    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
641      .update(stats.getResponseSizeBytes());
642  }
643
644  private void shutdown() {
645    this.reporter.stop();
646  }
647
648  /** Report RPC context to metrics system. */
649  public void updateRpc(MethodDescriptor method, Message param, CallStats stats, Throwable e) {
650    int callsPerServer = stats.getConcurrentCallsPerServer();
651    if (callsPerServer > 0) {
652      concurrentCallsPerServerHist.update(callsPerServer);
653    }
654    // Update the counter that tracks RPCs by type.
655    final String methodName = method.getService().getName() + "_" + method.getName();
656    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
657    if (e != null) {
658      getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc();
659      getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc();
660      if (e instanceof RemoteException) {
661        String fullClassName = ((RemoteException) e).getClassName();
662        String simpleClassName = (fullClassName != null)
663          ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1)
664          : "unknown";
665        getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc();
666      } else {
667        getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters,
668          counterFactory).inc();
669      }
670    }
671    // this implementation is tied directly to protobuf implementation details. would be better
672    // if we could dispatch based on something static, ie, request Message type.
673    if (method.getService() == ClientService.getDescriptor()) {
674      switch (method.getIndex()) {
675        case 0:
676          assert "Get".equals(method.getName());
677          getTracker.updateRpc(stats);
678          return;
679        case 1:
680          assert "Mutate".equals(method.getName());
681          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
682          switch (mutationType) {
683            case APPEND:
684              appendTracker.updateRpc(stats);
685              return;
686            case DELETE:
687              deleteTracker.updateRpc(stats);
688              return;
689            case INCREMENT:
690              incrementTracker.updateRpc(stats);
691              return;
692            case PUT:
693              putTracker.updateRpc(stats);
694              return;
695            default:
696              throw new RuntimeException("Unrecognized mutation type " + mutationType);
697          }
698        case 2:
699          assert "Scan".equals(method.getName());
700          scanTracker.updateRpc(stats);
701          return;
702        case 3:
703          assert "BulkLoadHFile".equals(method.getName());
704          // use generic implementation
705          break;
706        case 4:
707          assert "PrepareBulkLoad".equals(method.getName());
708          // use generic implementation
709          break;
710        case 5:
711          assert "CleanupBulkLoad".equals(method.getName());
712          // use generic implementation
713          break;
714        case 6:
715          assert "ExecService".equals(method.getName());
716          // use generic implementation
717          break;
718        case 7:
719          assert "ExecRegionServerService".equals(method.getName());
720          // use generic implementation
721          break;
722        case 8:
723          assert "Multi".equals(method.getName());
724          numActionsPerServerHist.update(stats.getNumActionsPerServer());
725          multiTracker.updateRpc(stats);
726          return;
727        default:
728          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
729      }
730    }
731    // Fallback to dynamic registry lookup for DDL methods.
732    updateRpcGeneric(methodName, stats);
733  }
734
735  public void incrCacheDroppingExceptions(Object exception) {
736    getMetric(
737      CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
738      cacheDroppingExceptions, counterFactory).inc();
739  }
740
741  public void incrNsLookups() {
742    this.nsLookups.inc();
743  }
744
745  public void incrNsLookupsFailed() {
746    this.nsLookupsFailed.inc();
747  }
748}