001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Supplier;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
040import org.apache.hbase.thirdparty.com.google.protobuf.Message;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
044import org.apache.hadoop.hbase.util.Bytes;
045
046/**
047 * This class is for maintaining the various connection statistics and publishing them through
048 * the metrics interfaces.
049 *
050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not
051 * conflict with other uses of Yammer Metrics within the client application. Instantiating
052 * this class implicitly creates and "starts" instances of these classes; be sure to call
053 * {@link #shutdown()} to terminate the thread pools they allocate.
054 */
055@InterfaceAudience.Private
056public class MetricsConnection implements StatisticTrackable {
057
058  /** Set this key to {@code true} to enable metrics collection of client requests. */
059  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
060
061  private static final String DRTN_BASE = "rpcCallDurationMs_";
062  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
063  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
064  private static final String MEMLOAD_BASE = "memstoreLoad_";
065  private static final String HEAP_BASE = "heapOccupancy_";
066  private static final String CACHE_BASE = "cacheDroppingExceptions_";
067  private static final String UNKNOWN_EXCEPTION = "UnknownException";
068  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
069
070  /** A container class for collecting details about the RPC call as it percolates. */
071  public static class CallStats {
072    private long requestSizeBytes = 0;
073    private long responseSizeBytes = 0;
074    private long startTime = 0;
075    private long callTimeMs = 0;
076    private int concurrentCallsPerServer = 0;
077    private int numActionsPerServer = 0;
078
079    public long getRequestSizeBytes() {
080      return requestSizeBytes;
081    }
082
083    public void setRequestSizeBytes(long requestSizeBytes) {
084      this.requestSizeBytes = requestSizeBytes;
085    }
086
087    public long getResponseSizeBytes() {
088      return responseSizeBytes;
089    }
090
091    public void setResponseSizeBytes(long responseSizeBytes) {
092      this.responseSizeBytes = responseSizeBytes;
093    }
094
095    public long getStartTime() {
096      return startTime;
097    }
098
099    public void setStartTime(long startTime) {
100      this.startTime = startTime;
101    }
102
103    public long getCallTimeMs() {
104      return callTimeMs;
105    }
106
107    public void setCallTimeMs(long callTimeMs) {
108      this.callTimeMs = callTimeMs;
109    }
110
111    public int getConcurrentCallsPerServer() {
112      return concurrentCallsPerServer;
113    }
114
115    public void setConcurrentCallsPerServer(int callsPerServer) {
116      this.concurrentCallsPerServer = callsPerServer;
117    }
118
119    public int getNumActionsPerServer() {
120      return numActionsPerServer;
121    }
122
123    public void setNumActionsPerServer(int numActionsPerServer) {
124      this.numActionsPerServer = numActionsPerServer;
125    }
126  }
127
128  @VisibleForTesting
129  protected static final class CallTracker {
130    private final String name;
131    @VisibleForTesting final Timer callTimer;
132    @VisibleForTesting final Histogram reqHist;
133    @VisibleForTesting final Histogram respHist;
134
135    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
136      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
137      if (subName != null) {
138        sb.append("(").append(subName).append(")");
139      }
140      this.name = sb.toString();
141      this.callTimer = registry.timer(name(MetricsConnection.class,
142        DRTN_BASE + this.name, scope));
143      this.reqHist = registry.histogram(name(MetricsConnection.class,
144        REQ_BASE + this.name, scope));
145      this.respHist = registry.histogram(name(MetricsConnection.class,
146        RESP_BASE + this.name, scope));
147    }
148
149    private CallTracker(MetricRegistry registry, String name, String scope) {
150      this(registry, name, null, scope);
151    }
152
153    public void updateRpc(CallStats stats) {
154      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
155      this.reqHist.update(stats.getRequestSizeBytes());
156      this.respHist.update(stats.getResponseSizeBytes());
157    }
158
159    @Override
160    public String toString() {
161      return "CallTracker:" + name;
162    }
163  }
164
165  protected static class RegionStats {
166    final String name;
167    final Histogram memstoreLoadHist;
168    final Histogram heapOccupancyHist;
169
170    public RegionStats(MetricRegistry registry, String name) {
171      this.name = name;
172      this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class,
173          MEMLOAD_BASE + this.name));
174      this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class,
175          HEAP_BASE + this.name));
176    }
177
178    public void update(RegionLoadStats regionStatistics) {
179      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
180      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
181    }
182  }
183
184  @VisibleForTesting
185  protected static class RunnerStats {
186    final Counter normalRunners;
187    final Counter delayRunners;
188    final Histogram delayIntevalHist;
189
190    public RunnerStats(MetricRegistry registry) {
191      this.normalRunners = registry.counter(
192        name(MetricsConnection.class, "normalRunnersCount"));
193      this.delayRunners = registry.counter(
194        name(MetricsConnection.class, "delayRunnersCount"));
195      this.delayIntevalHist = registry.histogram(
196        name(MetricsConnection.class, "delayIntervalHist"));
197    }
198
199    public void incrNormalRunners() {
200      this.normalRunners.inc();
201    }
202
203    public void incrDelayRunners() {
204      this.delayRunners.inc();
205    }
206
207    public void updateDelayInterval(long interval) {
208      this.delayIntevalHist.update(interval);
209    }
210  }
211
212  @VisibleForTesting
213  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
214          = new ConcurrentHashMap<>();
215
216  public void updateServerStats(ServerName serverName, byte[] regionName,
217                                Object r) {
218    if (!(r instanceof Result)) {
219      return;
220    }
221    Result result = (Result) r;
222    RegionLoadStats stats = result.getStats();
223    if (stats == null) {
224      return;
225    }
226    updateRegionStats(serverName, regionName, stats);
227  }
228
229  @Override
230  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
231    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
232    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
233      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
234    RegionStats regionStats =
235        computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
236    regionStats.update(stats);
237  }
238
239  /** A lambda for dispatching to the appropriate metric factory method */
240  private static interface NewMetric<T> {
241    T newMetric(Class<?> clazz, String name, String scope);
242  }
243
244  /** Anticipated number of metric entries */
245  private static final int CAPACITY = 50;
246  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
247  private static final float LOAD_FACTOR = 0.75f;
248  /**
249   * Anticipated number of concurrent accessor threads
250   */
251  private static final int CONCURRENCY_LEVEL = 256;
252
253  private final MetricRegistry registry;
254  private final JmxReporter reporter;
255  private final String scope;
256
257  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
258    @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
259      return registry.timer(name(clazz, name, scope));
260    }
261  };
262
263  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
264    @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
265      return registry.histogram(name(clazz, name, scope));
266    }
267  };
268
269  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
270    @Override public Counter newMetric(Class<?> clazz, String name, String scope) {
271      return registry.counter(name(clazz, name, scope));
272    }
273  };
274
275  // static metrics
276
277  @VisibleForTesting protected final Counter metaCacheHits;
278  @VisibleForTesting protected final Counter metaCacheMisses;
279  @VisibleForTesting protected final CallTracker getTracker;
280  @VisibleForTesting protected final CallTracker scanTracker;
281  @VisibleForTesting protected final CallTracker appendTracker;
282  @VisibleForTesting protected final CallTracker deleteTracker;
283  @VisibleForTesting protected final CallTracker incrementTracker;
284  @VisibleForTesting protected final CallTracker putTracker;
285  @VisibleForTesting protected final CallTracker multiTracker;
286  @VisibleForTesting protected final RunnerStats runnerStats;
287  @VisibleForTesting protected final Counter metaCacheNumClearServer;
288  @VisibleForTesting protected final Counter metaCacheNumClearRegion;
289  @VisibleForTesting protected final Counter hedgedReadOps;
290  @VisibleForTesting protected final Counter hedgedReadWin;
291  @VisibleForTesting protected final Histogram concurrentCallsPerServerHist;
292  @VisibleForTesting protected final Histogram numActionsPerServerHist;
293
294  // dynamic metrics
295
296  // These maps are used to cache references to the metric instances that are managed by the
297  // registry. I don't think their use perfectly removes redundant allocations, but it's
298  // a big improvement over calling registry.newMetric each time.
299  @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
300      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
301  @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
302      new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
303          LOAD_FACTOR, CONCURRENCY_LEVEL);
304  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
305    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
306
307  MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
308      Supplier<ThreadPoolExecutor> metaPool) {
309    this.scope = scope;
310    this.registry = new MetricRegistry();
311    this.registry.register(getExecutorPoolName(),
312        new RatioGauge() {
313          @Override
314          protected Ratio getRatio() {
315            ThreadPoolExecutor pool = batchPool.get();
316            if (pool == null) {
317              return Ratio.of(0, 0);
318            }
319            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
320          }
321        });
322    this.registry.register(getMetaPoolName(),
323        new RatioGauge() {
324          @Override
325          protected Ratio getRatio() {
326            ThreadPoolExecutor pool = metaPool.get();
327            if (pool == null) {
328              return Ratio.of(0, 0);
329            }
330            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
331          }
332        });
333    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
334    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
335    this.metaCacheNumClearServer = registry.counter(name(this.getClass(),
336      "metaCacheNumClearServer", scope));
337    this.metaCacheNumClearRegion = registry.counter(name(this.getClass(),
338      "metaCacheNumClearRegion", scope));
339    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
340    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
341    this.getTracker = new CallTracker(this.registry, "Get", scope);
342    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
343    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
344    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
345    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
346    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
347    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
348    this.runnerStats = new RunnerStats(this.registry);
349    this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
350      "concurrentCallsPerServer", scope));
351    this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
352      "numActionsPerServer", scope));
353
354    this.reporter = JmxReporter.forRegistry(this.registry).build();
355    this.reporter.start();
356  }
357
358  @VisibleForTesting
359  final String getExecutorPoolName() {
360    return name(getClass(), "executorPoolActiveThreads", scope);
361  }
362
363  @VisibleForTesting
364  final String getMetaPoolName() {
365    return name(getClass(), "metaPoolActiveThreads", scope);
366  }
367
368  @VisibleForTesting
369  MetricRegistry getMetricRegistry() {
370    return registry;
371  }
372
373  public void shutdown() {
374    this.reporter.stop();
375  }
376
377  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
378  public static CallStats newCallStats() {
379    // TODO: instance pool to reduce GC?
380    return new CallStats();
381  }
382
383  /** Increment the number of meta cache hits. */
384  public void incrMetaCacheHit() {
385    metaCacheHits.inc();
386  }
387
388  /** Increment the number of meta cache misses. */
389  public void incrMetaCacheMiss() {
390    metaCacheMisses.inc();
391  }
392
393  /** Increment the number of meta cache drops requested for entire RegionServer. */
394  public void incrMetaCacheNumClearServer() {
395    metaCacheNumClearServer.inc();
396  }
397
398  /** Increment the number of meta cache drops requested for individual region. */
399  public void incrMetaCacheNumClearRegion() {
400    metaCacheNumClearRegion.inc();
401  }
402
403  /** Increment the number of meta cache drops requested for individual region. */
404  public void incrMetaCacheNumClearRegion(int count) {
405    metaCacheNumClearRegion.inc(count);
406  }
407
408  /** Increment the number of hedged read that have occurred. */
409  public void incrHedgedReadOps() {
410    hedgedReadOps.inc();
411  }
412
413  /** Increment the number of hedged read returned faster than the original read. */
414  public void incrHedgedReadWin() {
415    hedgedReadWin.inc();
416  }
417
418  /** Increment the number of normal runner counts. */
419  public void incrNormalRunners() {
420    this.runnerStats.incrNormalRunners();
421  }
422
423  /** Increment the number of delay runner counts and update delay interval of delay runner. */
424  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
425    this.runnerStats.incrDelayRunners();
426    this.runnerStats.updateDelayInterval(interval);
427  }
428
429  /**
430   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
431   */
432  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
433    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
434  }
435
436  /** Update call stats for non-critical-path methods */
437  private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
438    final String methodName = method.getService().getName() + "_" + method.getName();
439    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
440        .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
441    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
442        .update(stats.getRequestSizeBytes());
443    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
444        .update(stats.getResponseSizeBytes());
445  }
446
447  /** Report RPC context to metrics system. */
448  public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
449    int callsPerServer = stats.getConcurrentCallsPerServer();
450    if (callsPerServer > 0) {
451      concurrentCallsPerServerHist.update(callsPerServer);
452    }
453    // this implementation is tied directly to protobuf implementation details. would be better
454    // if we could dispatch based on something static, ie, request Message type.
455    if (method.getService() == ClientService.getDescriptor()) {
456      switch(method.getIndex()) {
457        case 0:
458          assert "Get".equals(method.getName());
459          getTracker.updateRpc(stats);
460          return;
461        case 1:
462          assert "Mutate".equals(method.getName());
463          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
464          switch(mutationType) {
465            case APPEND:
466              appendTracker.updateRpc(stats);
467              return;
468            case DELETE:
469              deleteTracker.updateRpc(stats);
470              return;
471            case INCREMENT:
472              incrementTracker.updateRpc(stats);
473              return;
474            case PUT:
475              putTracker.updateRpc(stats);
476              return;
477            default:
478              throw new RuntimeException("Unrecognized mutation type " + mutationType);
479          }
480        case 2:
481          assert "Scan".equals(method.getName());
482          scanTracker.updateRpc(stats);
483          return;
484        case 3:
485          assert "BulkLoadHFile".equals(method.getName());
486          // use generic implementation
487          break;
488        case 4:
489          assert "PrepareBulkLoad".equals(method.getName());
490          // use generic implementation
491          break;
492        case 5:
493          assert "CleanupBulkLoad".equals(method.getName());
494          // use generic implementation
495          break;
496        case 6:
497          assert "ExecService".equals(method.getName());
498          // use generic implementation
499          break;
500        case 7:
501          assert "ExecRegionServerService".equals(method.getName());
502          // use generic implementation
503          break;
504        case 8:
505          assert "Multi".equals(method.getName());
506          numActionsPerServerHist.update(stats.getNumActionsPerServer());
507          multiTracker.updateRpc(stats);
508          return;
509        default:
510          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
511      }
512    }
513    // Fallback to dynamic registry lookup for DDL methods.
514    updateRpcGeneric(method, stats);
515  }
516
517  public void incrCacheDroppingExceptions(Object exception) {
518    getMetric(CACHE_BASE +
519      (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
520      cacheDroppingExceptions, counterFactory).inc();
521  }
522}