001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.ConcurrentSkipListMap;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.function.Supplier;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
041import org.apache.hbase.thirdparty.com.google.protobuf.Message;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
046
047/**
048 * This class is for maintaining the various connection statistics and publishing them through the
049 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
050 * as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
051 * this class implicitly creates and "starts" instances of these classes; be sure to call
052 * {@link #shutdown()} to terminate the thread pools they allocate.
053 */
054@InterfaceAudience.Private
055public class MetricsConnection implements StatisticTrackable {
056
057  /** Set this key to {@code true} to enable metrics collection of client requests. */
058  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
059
060  /**
061   * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
062   * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
063   * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be
064   * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users
065   * may set this key to give a more contextual name for this scope. For example, one might want to
066   * differentiate a read connection from a write connection by setting the scopes to "foo-read" and
067   * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics.
068   * Care should be taken to avoid using the same scope for multiple Connections, otherwise the
069   * metrics may aggregate in unforeseen ways.
070   */
071  public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
072
073  /**
074   * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or
075   * by generating a default from the passed clusterId and connectionObj's hashCode.
076   * @param conf          configuration for the connection
077   * @param clusterId     clusterId for the connection
078   * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this
079   *                      MetricsConnection.
080   */
081  static String getScope(Configuration conf, String clusterId, Object connectionObj) {
082    return conf.get(METRICS_SCOPE_KEY,
083      clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
084  }
085
086  private static final String CNT_BASE = "rpcCount_";
087  private static final String DRTN_BASE = "rpcCallDurationMs_";
088  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
089  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
090  private static final String MEMLOAD_BASE = "memstoreLoad_";
091  private static final String HEAP_BASE = "heapOccupancy_";
092  private static final String CACHE_BASE = "cacheDroppingExceptions_";
093  private static final String UNKNOWN_EXCEPTION = "UnknownException";
094  private static final String NS_LOOKUPS = "nsLookups";
095  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
096  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
097
098  /** A container class for collecting details about the RPC call as it percolates. */
099  public static class CallStats {
100    private long requestSizeBytes = 0;
101    private long responseSizeBytes = 0;
102    private long startTime = 0;
103    private long callTimeMs = 0;
104    private int concurrentCallsPerServer = 0;
105    private int numActionsPerServer = 0;
106
107    public long getRequestSizeBytes() {
108      return requestSizeBytes;
109    }
110
111    public void setRequestSizeBytes(long requestSizeBytes) {
112      this.requestSizeBytes = requestSizeBytes;
113    }
114
115    public long getResponseSizeBytes() {
116      return responseSizeBytes;
117    }
118
119    public void setResponseSizeBytes(long responseSizeBytes) {
120      this.responseSizeBytes = responseSizeBytes;
121    }
122
123    public long getStartTime() {
124      return startTime;
125    }
126
127    public void setStartTime(long startTime) {
128      this.startTime = startTime;
129    }
130
131    public long getCallTimeMs() {
132      return callTimeMs;
133    }
134
135    public void setCallTimeMs(long callTimeMs) {
136      this.callTimeMs = callTimeMs;
137    }
138
139    public int getConcurrentCallsPerServer() {
140      return concurrentCallsPerServer;
141    }
142
143    public void setConcurrentCallsPerServer(int callsPerServer) {
144      this.concurrentCallsPerServer = callsPerServer;
145    }
146
147    public int getNumActionsPerServer() {
148      return numActionsPerServer;
149    }
150
151    public void setNumActionsPerServer(int numActionsPerServer) {
152      this.numActionsPerServer = numActionsPerServer;
153    }
154  }
155
156  protected static final class CallTracker {
157    private final String name;
158    final Timer callTimer;
159    final Histogram reqHist;
160    final Histogram respHist;
161
162    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
163      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
164      if (subName != null) {
165        sb.append("(").append(subName).append(")");
166      }
167      this.name = sb.toString();
168      this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope));
169      this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope));
170      this.respHist =
171        registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope));
172    }
173
174    private CallTracker(MetricRegistry registry, String name, String scope) {
175      this(registry, name, null, scope);
176    }
177
178    public void updateRpc(CallStats stats) {
179      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
180      this.reqHist.update(stats.getRequestSizeBytes());
181      this.respHist.update(stats.getResponseSizeBytes());
182    }
183
184    @Override
185    public String toString() {
186      return "CallTracker:" + name;
187    }
188  }
189
190  protected static class RegionStats {
191    final String name;
192    final Histogram memstoreLoadHist;
193    final Histogram heapOccupancyHist;
194
195    public RegionStats(MetricRegistry registry, String name) {
196      this.name = name;
197      this.memstoreLoadHist =
198        registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name));
199      this.heapOccupancyHist =
200        registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name));
201    }
202
203    public void update(RegionLoadStats regionStatistics) {
204      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
205      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
206    }
207  }
208
209  protected static class RunnerStats {
210    final Counter normalRunners;
211    final Counter delayRunners;
212    final Histogram delayIntevalHist;
213
214    public RunnerStats(MetricRegistry registry) {
215      this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
216      this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
217      this.delayIntevalHist =
218        registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
219    }
220
221    public void incrNormalRunners() {
222      this.normalRunners.inc();
223    }
224
225    public void incrDelayRunners() {
226      this.delayRunners.inc();
227    }
228
229    public void updateDelayInterval(long interval) {
230      this.delayIntevalHist.update(interval);
231    }
232  }
233
234  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
235    new ConcurrentHashMap<>();
236
237  public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
238    if (!(r instanceof Result)) {
239      return;
240    }
241    Result result = (Result) r;
242    RegionLoadStats stats = result.getStats();
243    if (stats == null) {
244      return;
245    }
246    updateRegionStats(serverName, regionName, stats);
247  }
248
249  @Override
250  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
251    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
252    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
253      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
254    RegionStats regionStats =
255      computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
256    regionStats.update(stats);
257  }
258
259  /** A lambda for dispatching to the appropriate metric factory method */
260  private static interface NewMetric<T> {
261    T newMetric(Class<?> clazz, String name, String scope);
262  }
263
264  /** Anticipated number of metric entries */
265  private static final int CAPACITY = 50;
266  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
267  private static final float LOAD_FACTOR = 0.75f;
268  /**
269   * Anticipated number of concurrent accessor threads
270   */
271  private static final int CONCURRENCY_LEVEL = 256;
272
273  private final MetricRegistry registry;
274  private final JmxReporter reporter;
275  protected final String scope;
276
277  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
278    @Override
279    public Timer newMetric(Class<?> clazz, String name, String scope) {
280      return registry.timer(name(clazz, name, scope));
281    }
282  };
283
284  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
285    @Override
286    public Histogram newMetric(Class<?> clazz, String name, String scope) {
287      return registry.histogram(name(clazz, name, scope));
288    }
289  };
290
291  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
292    @Override
293    public Counter newMetric(Class<?> clazz, String name, String scope) {
294      return registry.counter(name(clazz, name, scope));
295    }
296  };
297
298  // static metrics
299
300  protected final Counter metaCacheHits;
301  protected final Counter metaCacheMisses;
302  protected final CallTracker getTracker;
303  protected final CallTracker scanTracker;
304  protected final CallTracker appendTracker;
305  protected final CallTracker deleteTracker;
306  protected final CallTracker incrementTracker;
307  protected final CallTracker putTracker;
308  protected final CallTracker multiTracker;
309  protected final RunnerStats runnerStats;
310  protected final Counter metaCacheNumClearServer;
311  protected final Counter metaCacheNumClearRegion;
312  protected final Counter hedgedReadOps;
313  protected final Counter hedgedReadWin;
314  protected final Histogram concurrentCallsPerServerHist;
315  protected final Histogram numActionsPerServerHist;
316  protected final Counter nsLookups;
317  protected final Counter nsLookupsFailed;
318
319  // dynamic metrics
320
321  // These maps are used to cache references to the metric instances that are managed by the
322  // registry. I don't think their use perfectly removes redundant allocations, but it's
323  // a big improvement over calling registry.newMetric each time.
324  protected final ConcurrentMap<String, Timer> rpcTimers =
325    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
326  protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
327    CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
328  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
329    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
330  protected final ConcurrentMap<String, Counter> rpcCounters =
331    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
332
333  MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
334    Supplier<ThreadPoolExecutor> metaPool) {
335    this.scope = scope;
336    this.registry = new MetricRegistry();
337    this.registry.register(getExecutorPoolName(), new RatioGauge() {
338      @Override
339      protected Ratio getRatio() {
340        ThreadPoolExecutor pool = batchPool.get();
341        if (pool == null) {
342          return Ratio.of(0, 0);
343        }
344        return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
345      }
346    });
347    this.registry.register(getMetaPoolName(), new RatioGauge() {
348      @Override
349      protected Ratio getRatio() {
350        ThreadPoolExecutor pool = metaPool.get();
351        if (pool == null) {
352          return Ratio.of(0, 0);
353        }
354        return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
355      }
356    });
357    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
358    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
359    this.metaCacheNumClearServer =
360      registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope));
361    this.metaCacheNumClearRegion =
362      registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope));
363    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
364    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
365    this.getTracker = new CallTracker(this.registry, "Get", scope);
366    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
367    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
368    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
369    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
370    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
371    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
372    this.runnerStats = new RunnerStats(this.registry);
373    this.concurrentCallsPerServerHist =
374      registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope));
375    this.numActionsPerServerHist =
376      registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope));
377    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
378    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
379
380    this.reporter = JmxReporter.forRegistry(this.registry).build();
381    this.reporter.start();
382  }
383
384  final String getExecutorPoolName() {
385    return name(getClass(), "executorPoolActiveThreads", scope);
386  }
387
388  final String getMetaPoolName() {
389    return name(getClass(), "metaPoolActiveThreads", scope);
390  }
391
392  MetricRegistry getMetricRegistry() {
393    return registry;
394  }
395
396  public void shutdown() {
397    this.reporter.stop();
398  }
399
400  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
401  public static CallStats newCallStats() {
402    // TODO: instance pool to reduce GC?
403    return new CallStats();
404  }
405
406  /** Increment the number of meta cache hits. */
407  public void incrMetaCacheHit() {
408    metaCacheHits.inc();
409  }
410
411  /** Increment the number of meta cache misses. */
412  public void incrMetaCacheMiss() {
413    metaCacheMisses.inc();
414  }
415
416  /** Increment the number of meta cache drops requested for entire RegionServer. */
417  public void incrMetaCacheNumClearServer() {
418    metaCacheNumClearServer.inc();
419  }
420
421  /** Increment the number of meta cache drops requested for individual region. */
422  public void incrMetaCacheNumClearRegion() {
423    metaCacheNumClearRegion.inc();
424  }
425
426  /** Increment the number of meta cache drops requested for individual region. */
427  public void incrMetaCacheNumClearRegion(int count) {
428    metaCacheNumClearRegion.inc(count);
429  }
430
431  /** Increment the number of hedged read that have occurred. */
432  public void incrHedgedReadOps() {
433    hedgedReadOps.inc();
434  }
435
436  /** Increment the number of hedged read returned faster than the original read. */
437  public void incrHedgedReadWin() {
438    hedgedReadWin.inc();
439  }
440
441  /** Increment the number of normal runner counts. */
442  public void incrNormalRunners() {
443    this.runnerStats.incrNormalRunners();
444  }
445
446  /** Increment the number of delay runner counts and update delay interval of delay runner. */
447  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
448    this.runnerStats.incrDelayRunners();
449    this.runnerStats.updateDelayInterval(interval);
450  }
451
452  /**
453   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
454   */
455  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
456    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
457  }
458
459  /** Update call stats for non-critical-path methods */
460  private void updateRpcGeneric(String methodName, CallStats stats) {
461    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(),
462      TimeUnit.MILLISECONDS);
463    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
464      .update(stats.getRequestSizeBytes());
465    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
466      .update(stats.getResponseSizeBytes());
467  }
468
469  /** Report RPC context to metrics system. */
470  public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
471    int callsPerServer = stats.getConcurrentCallsPerServer();
472    if (callsPerServer > 0) {
473      concurrentCallsPerServerHist.update(callsPerServer);
474    }
475    // Update the counter that tracks RPCs by type.
476    final String methodName = method.getService().getName() + "_" + method.getName();
477    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
478    // this implementation is tied directly to protobuf implementation details. would be better
479    // if we could dispatch based on something static, ie, request Message type.
480    if (method.getService() == ClientService.getDescriptor()) {
481      switch (method.getIndex()) {
482        case 0:
483          assert "Get".equals(method.getName());
484          getTracker.updateRpc(stats);
485          return;
486        case 1:
487          assert "Mutate".equals(method.getName());
488          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
489          switch (mutationType) {
490            case APPEND:
491              appendTracker.updateRpc(stats);
492              return;
493            case DELETE:
494              deleteTracker.updateRpc(stats);
495              return;
496            case INCREMENT:
497              incrementTracker.updateRpc(stats);
498              return;
499            case PUT:
500              putTracker.updateRpc(stats);
501              return;
502            default:
503              throw new RuntimeException("Unrecognized mutation type " + mutationType);
504          }
505        case 2:
506          assert "Scan".equals(method.getName());
507          scanTracker.updateRpc(stats);
508          return;
509        case 3:
510          assert "BulkLoadHFile".equals(method.getName());
511          // use generic implementation
512          break;
513        case 4:
514          assert "PrepareBulkLoad".equals(method.getName());
515          // use generic implementation
516          break;
517        case 5:
518          assert "CleanupBulkLoad".equals(method.getName());
519          // use generic implementation
520          break;
521        case 6:
522          assert "ExecService".equals(method.getName());
523          // use generic implementation
524          break;
525        case 7:
526          assert "ExecRegionServerService".equals(method.getName());
527          // use generic implementation
528          break;
529        case 8:
530          assert "Multi".equals(method.getName());
531          numActionsPerServerHist.update(stats.getNumActionsPerServer());
532          multiTracker.updateRpc(stats);
533          return;
534        default:
535          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
536      }
537    }
538    // Fallback to dynamic registry lookup for DDL methods.
539    updateRpcGeneric(methodName, stats);
540  }
541
542  public void incrCacheDroppingExceptions(Object exception) {
543    getMetric(
544      CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
545      cacheDroppingExceptions, counterFactory).inc();
546  }
547
548  public void incrNsLookups() {
549    this.nsLookups.inc();
550  }
551
552  public void incrNsLookupsFailed() {
553    this.nsLookupsFailed.inc();
554  }
555}