001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.function.Supplier;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
039import org.apache.hbase.thirdparty.com.google.protobuf.Message;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
043import org.apache.hadoop.hbase.util.Bytes;
044
045/**
046 * This class is for maintaining the various connection statistics and publishing them through
047 * the metrics interfaces.
048 *
049 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not
050 * conflict with other uses of Yammer Metrics within the client application. Instantiating
051 * this class implicitly creates and "starts" instances of these classes; be sure to call
052 * {@link #shutdown()} to terminate the thread pools they allocate.
053 */
054@InterfaceAudience.Private
055public class MetricsConnection implements StatisticTrackable {
056
057  /** Set this key to {@code true} to enable metrics collection of client requests. */
058  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
059
060  private static final String CNT_BASE = "rpcCount_";
061  private static final String DRTN_BASE = "rpcCallDurationMs_";
062  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
063  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
064  private static final String MEMLOAD_BASE = "memstoreLoad_";
065  private static final String HEAP_BASE = "heapOccupancy_";
066  private static final String CACHE_BASE = "cacheDroppingExceptions_";
067  private static final String UNKNOWN_EXCEPTION = "UnknownException";
068  private static final String NS_LOOKUPS = "nsLookups";
069  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
070  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
071
072  /** A container class for collecting details about the RPC call as it percolates. */
073  public static class CallStats {
074    private long requestSizeBytes = 0;
075    private long responseSizeBytes = 0;
076    private long startTime = 0;
077    private long callTimeMs = 0;
078    private int concurrentCallsPerServer = 0;
079    private int numActionsPerServer = 0;
080
081    public long getRequestSizeBytes() {
082      return requestSizeBytes;
083    }
084
085    public void setRequestSizeBytes(long requestSizeBytes) {
086      this.requestSizeBytes = requestSizeBytes;
087    }
088
089    public long getResponseSizeBytes() {
090      return responseSizeBytes;
091    }
092
093    public void setResponseSizeBytes(long responseSizeBytes) {
094      this.responseSizeBytes = responseSizeBytes;
095    }
096
097    public long getStartTime() {
098      return startTime;
099    }
100
101    public void setStartTime(long startTime) {
102      this.startTime = startTime;
103    }
104
105    public long getCallTimeMs() {
106      return callTimeMs;
107    }
108
109    public void setCallTimeMs(long callTimeMs) {
110      this.callTimeMs = callTimeMs;
111    }
112
113    public int getConcurrentCallsPerServer() {
114      return concurrentCallsPerServer;
115    }
116
117    public void setConcurrentCallsPerServer(int callsPerServer) {
118      this.concurrentCallsPerServer = callsPerServer;
119    }
120
121    public int getNumActionsPerServer() {
122      return numActionsPerServer;
123    }
124
125    public void setNumActionsPerServer(int numActionsPerServer) {
126      this.numActionsPerServer = numActionsPerServer;
127    }
128  }
129
130  protected static final class CallTracker {
131    private final String name;
132    final Timer callTimer;
133    final Histogram reqHist;
134    final Histogram respHist;
135
136    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
137      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
138      if (subName != null) {
139        sb.append("(").append(subName).append(")");
140      }
141      this.name = sb.toString();
142      this.callTimer = registry.timer(name(MetricsConnection.class,
143        DRTN_BASE + this.name, scope));
144      this.reqHist = registry.histogram(name(MetricsConnection.class,
145        REQ_BASE + this.name, scope));
146      this.respHist = registry.histogram(name(MetricsConnection.class,
147        RESP_BASE + this.name, scope));
148    }
149
150    private CallTracker(MetricRegistry registry, String name, String scope) {
151      this(registry, name, null, scope);
152    }
153
154    public void updateRpc(CallStats stats) {
155      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
156      this.reqHist.update(stats.getRequestSizeBytes());
157      this.respHist.update(stats.getResponseSizeBytes());
158    }
159
160    @Override
161    public String toString() {
162      return "CallTracker:" + name;
163    }
164  }
165
166  protected static class RegionStats {
167    final String name;
168    final Histogram memstoreLoadHist;
169    final Histogram heapOccupancyHist;
170
171    public RegionStats(MetricRegistry registry, String name) {
172      this.name = name;
173      this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class,
174          MEMLOAD_BASE + this.name));
175      this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class,
176          HEAP_BASE + this.name));
177    }
178
179    public void update(RegionLoadStats regionStatistics) {
180      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
181      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
182    }
183  }
184
185  protected static class RunnerStats {
186    final Counter normalRunners;
187    final Counter delayRunners;
188    final Histogram delayIntevalHist;
189
190    public RunnerStats(MetricRegistry registry) {
191      this.normalRunners = registry.counter(
192        name(MetricsConnection.class, "normalRunnersCount"));
193      this.delayRunners = registry.counter(
194        name(MetricsConnection.class, "delayRunnersCount"));
195      this.delayIntevalHist = registry.histogram(
196        name(MetricsConnection.class, "delayIntervalHist"));
197    }
198
199    public void incrNormalRunners() {
200      this.normalRunners.inc();
201    }
202
203    public void incrDelayRunners() {
204      this.delayRunners.inc();
205    }
206
207    public void updateDelayInterval(long interval) {
208      this.delayIntevalHist.update(interval);
209    }
210  }
211
212  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
213          = new ConcurrentHashMap<>();
214
215  public void updateServerStats(ServerName serverName, byte[] regionName,
216                                Object r) {
217    if (!(r instanceof Result)) {
218      return;
219    }
220    Result result = (Result) r;
221    RegionLoadStats stats = result.getStats();
222    if (stats == null) {
223      return;
224    }
225    updateRegionStats(serverName, regionName, stats);
226  }
227
228  @Override
229  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
230    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
231    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
232      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
233    RegionStats regionStats =
234        computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
235    regionStats.update(stats);
236  }
237
238  /** A lambda for dispatching to the appropriate metric factory method */
239  private static interface NewMetric<T> {
240    T newMetric(Class<?> clazz, String name, String scope);
241  }
242
243  /** Anticipated number of metric entries */
244  private static final int CAPACITY = 50;
245  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
246  private static final float LOAD_FACTOR = 0.75f;
247  /**
248   * Anticipated number of concurrent accessor threads
249   */
250  private static final int CONCURRENCY_LEVEL = 256;
251
252  private final MetricRegistry registry;
253  private final JmxReporter reporter;
254  private final String scope;
255
256  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
257    @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
258      return registry.timer(name(clazz, name, scope));
259    }
260  };
261
262  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
263    @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
264      return registry.histogram(name(clazz, name, scope));
265    }
266  };
267
268  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
269    @Override public Counter newMetric(Class<?> clazz, String name, String scope) {
270      return registry.counter(name(clazz, name, scope));
271    }
272  };
273
274  // static metrics
275
276  protected final Counter metaCacheHits;
277  protected final Counter metaCacheMisses;
278  protected final CallTracker getTracker;
279  protected final CallTracker scanTracker;
280  protected final CallTracker appendTracker;
281  protected final CallTracker deleteTracker;
282  protected final CallTracker incrementTracker;
283  protected final CallTracker putTracker;
284  protected final CallTracker multiTracker;
285  protected final RunnerStats runnerStats;
286  protected final Counter metaCacheNumClearServer;
287  protected final Counter metaCacheNumClearRegion;
288  protected final Counter hedgedReadOps;
289  protected final Counter hedgedReadWin;
290  protected final Histogram concurrentCallsPerServerHist;
291  protected final Histogram numActionsPerServerHist;
292  protected final Counter nsLookups;
293  protected final Counter nsLookupsFailed;
294
295  // dynamic metrics
296
297  // These maps are used to cache references to the metric instances that are managed by the
298  // registry. I don't think their use perfectly removes redundant allocations, but it's
299  // a big improvement over calling registry.newMetric each time.
300  protected final ConcurrentMap<String, Timer> rpcTimers =
301      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
302  protected final ConcurrentMap<String, Histogram> rpcHistograms =
303      new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
304          LOAD_FACTOR, CONCURRENCY_LEVEL);
305  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
306    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
307  protected final ConcurrentMap<String, Counter>  rpcCounters =
308      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
309
310  MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
311      Supplier<ThreadPoolExecutor> metaPool) {
312    this.scope = scope;
313    this.registry = new MetricRegistry();
314    this.registry.register(getExecutorPoolName(),
315        new RatioGauge() {
316          @Override
317          protected Ratio getRatio() {
318            ThreadPoolExecutor pool = batchPool.get();
319            if (pool == null) {
320              return Ratio.of(0, 0);
321            }
322            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
323          }
324        });
325    this.registry.register(getMetaPoolName(),
326        new RatioGauge() {
327          @Override
328          protected Ratio getRatio() {
329            ThreadPoolExecutor pool = metaPool.get();
330            if (pool == null) {
331              return Ratio.of(0, 0);
332            }
333            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
334          }
335        });
336    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
337    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
338    this.metaCacheNumClearServer = registry.counter(name(this.getClass(),
339      "metaCacheNumClearServer", scope));
340    this.metaCacheNumClearRegion = registry.counter(name(this.getClass(),
341      "metaCacheNumClearRegion", scope));
342    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
343    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
344    this.getTracker = new CallTracker(this.registry, "Get", scope);
345    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
346    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
347    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
348    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
349    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
350    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
351    this.runnerStats = new RunnerStats(this.registry);
352    this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
353      "concurrentCallsPerServer", scope));
354    this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
355      "numActionsPerServer", scope));
356    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
357    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
358
359    this.reporter = JmxReporter.forRegistry(this.registry).build();
360    this.reporter.start();
361  }
362
363  final String getExecutorPoolName() {
364    return name(getClass(), "executorPoolActiveThreads", scope);
365  }
366
367  final String getMetaPoolName() {
368    return name(getClass(), "metaPoolActiveThreads", scope);
369  }
370
371  MetricRegistry getMetricRegistry() {
372    return registry;
373  }
374
375  public void shutdown() {
376    this.reporter.stop();
377  }
378
379  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
380  public static CallStats newCallStats() {
381    // TODO: instance pool to reduce GC?
382    return new CallStats();
383  }
384
385  /** Increment the number of meta cache hits. */
386  public void incrMetaCacheHit() {
387    metaCacheHits.inc();
388  }
389
390  /** Increment the number of meta cache misses. */
391  public void incrMetaCacheMiss() {
392    metaCacheMisses.inc();
393  }
394
395  /** Increment the number of meta cache drops requested for entire RegionServer. */
396  public void incrMetaCacheNumClearServer() {
397    metaCacheNumClearServer.inc();
398  }
399
400  /** Increment the number of meta cache drops requested for individual region. */
401  public void incrMetaCacheNumClearRegion() {
402    metaCacheNumClearRegion.inc();
403  }
404
405  /** Increment the number of meta cache drops requested for individual region. */
406  public void incrMetaCacheNumClearRegion(int count) {
407    metaCacheNumClearRegion.inc(count);
408  }
409
410  /** Increment the number of hedged read that have occurred. */
411  public void incrHedgedReadOps() {
412    hedgedReadOps.inc();
413  }
414
415  /** Increment the number of hedged read returned faster than the original read. */
416  public void incrHedgedReadWin() {
417    hedgedReadWin.inc();
418  }
419
420  /** Increment the number of normal runner counts. */
421  public void incrNormalRunners() {
422    this.runnerStats.incrNormalRunners();
423  }
424
425  /** Increment the number of delay runner counts and update delay interval of delay runner. */
426  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
427    this.runnerStats.incrDelayRunners();
428    this.runnerStats.updateDelayInterval(interval);
429  }
430
431  /**
432   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
433   */
434  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
435    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
436  }
437
438  /** Update call stats for non-critical-path methods */
439  private void updateRpcGeneric(String methodName, CallStats stats) {
440    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
441        .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
442    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
443        .update(stats.getRequestSizeBytes());
444    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
445        .update(stats.getResponseSizeBytes());
446  }
447
448  /** Report RPC context to metrics system. */
449  public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
450    int callsPerServer = stats.getConcurrentCallsPerServer();
451    if (callsPerServer > 0) {
452      concurrentCallsPerServerHist.update(callsPerServer);
453    }
454    // Update the counter that tracks RPCs by type.
455    final String methodName = method.getService().getName() + "_" + method.getName();
456    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
457    // this implementation is tied directly to protobuf implementation details. would be better
458    // if we could dispatch based on something static, ie, request Message type.
459    if (method.getService() == ClientService.getDescriptor()) {
460      switch(method.getIndex()) {
461        case 0:
462          assert "Get".equals(method.getName());
463          getTracker.updateRpc(stats);
464          return;
465        case 1:
466          assert "Mutate".equals(method.getName());
467          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
468          switch(mutationType) {
469            case APPEND:
470              appendTracker.updateRpc(stats);
471              return;
472            case DELETE:
473              deleteTracker.updateRpc(stats);
474              return;
475            case INCREMENT:
476              incrementTracker.updateRpc(stats);
477              return;
478            case PUT:
479              putTracker.updateRpc(stats);
480              return;
481            default:
482              throw new RuntimeException("Unrecognized mutation type " + mutationType);
483          }
484        case 2:
485          assert "Scan".equals(method.getName());
486          scanTracker.updateRpc(stats);
487          return;
488        case 3:
489          assert "BulkLoadHFile".equals(method.getName());
490          // use generic implementation
491          break;
492        case 4:
493          assert "PrepareBulkLoad".equals(method.getName());
494          // use generic implementation
495          break;
496        case 5:
497          assert "CleanupBulkLoad".equals(method.getName());
498          // use generic implementation
499          break;
500        case 6:
501          assert "ExecService".equals(method.getName());
502          // use generic implementation
503          break;
504        case 7:
505          assert "ExecRegionServerService".equals(method.getName());
506          // use generic implementation
507          break;
508        case 8:
509          assert "Multi".equals(method.getName());
510          numActionsPerServerHist.update(stats.getNumActionsPerServer());
511          multiTracker.updateRpc(stats);
512          return;
513        default:
514          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
515      }
516    }
517    // Fallback to dynamic registry lookup for DDL methods.
518    updateRpcGeneric(methodName, stats);
519  }
520
521  public void incrCacheDroppingExceptions(Object exception) {
522    getMetric(CACHE_BASE +
523      (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
524      cacheDroppingExceptions, counterFactory).inc();
525  }
526
527  public void incrNsLookups() {
528    this.nsLookups.inc();
529  }
530
531  public void incrNsLookupsFailed() {
532    this.nsLookupsFailed.inc();
533  }
534}