001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
040import org.apache.hbase.thirdparty.com.google.protobuf.Message;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
044import org.apache.hadoop.hbase.util.Bytes;
045
046/**
047 * This class is for maintaining the various connection statistics and publishing them through
048 * the metrics interfaces.
049 *
050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not
051 * conflict with other uses of Yammer Metrics within the client application. Instantiating
052 * this class implicitly creates and "starts" instances of these classes; be sure to call
053 * {@link #shutdown()} to terminate the thread pools they allocate.
054 */
055@InterfaceAudience.Private
056public class MetricsConnection implements StatisticTrackable {
057
058  /** Set this key to {@code true} to enable metrics collection of client requests. */
059  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
060
061  private static final String DRTN_BASE = "rpcCallDurationMs_";
062  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
063  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
064  private static final String MEMLOAD_BASE = "memstoreLoad_";
065  private static final String HEAP_BASE = "heapOccupancy_";
066  private static final String CACHE_BASE = "cacheDroppingExceptions_";
067  private static final String UNKNOWN_EXCEPTION = "UnknownException";
068  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
069
070  /** A container class for collecting details about the RPC call as it percolates. */
071  public static class CallStats {
072    private long requestSizeBytes = 0;
073    private long responseSizeBytes = 0;
074    private long startTime = 0;
075    private long callTimeMs = 0;
076    private int concurrentCallsPerServer = 0;
077
078    public long getRequestSizeBytes() {
079      return requestSizeBytes;
080    }
081
082    public void setRequestSizeBytes(long requestSizeBytes) {
083      this.requestSizeBytes = requestSizeBytes;
084    }
085
086    public long getResponseSizeBytes() {
087      return responseSizeBytes;
088    }
089
090    public void setResponseSizeBytes(long responseSizeBytes) {
091      this.responseSizeBytes = responseSizeBytes;
092    }
093
094    public long getStartTime() {
095      return startTime;
096    }
097
098    public void setStartTime(long startTime) {
099      this.startTime = startTime;
100    }
101
102    public long getCallTimeMs() {
103      return callTimeMs;
104    }
105
106    public void setCallTimeMs(long callTimeMs) {
107      this.callTimeMs = callTimeMs;
108    }
109
110    public int getConcurrentCallsPerServer() {
111      return concurrentCallsPerServer;
112    }
113
114    public void setConcurrentCallsPerServer(int callsPerServer) {
115      this.concurrentCallsPerServer = callsPerServer;
116    }
117  }
118
119  @VisibleForTesting
120  protected static final class CallTracker {
121    private final String name;
122    @VisibleForTesting final Timer callTimer;
123    @VisibleForTesting final Histogram reqHist;
124    @VisibleForTesting final Histogram respHist;
125
126    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
127      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
128      if (subName != null) {
129        sb.append("(").append(subName).append(")");
130      }
131      this.name = sb.toString();
132      this.callTimer = registry.timer(name(MetricsConnection.class,
133        DRTN_BASE + this.name, scope));
134      this.reqHist = registry.histogram(name(MetricsConnection.class,
135        REQ_BASE + this.name, scope));
136      this.respHist = registry.histogram(name(MetricsConnection.class,
137        RESP_BASE + this.name, scope));
138    }
139
140    private CallTracker(MetricRegistry registry, String name, String scope) {
141      this(registry, name, null, scope);
142    }
143
144    public void updateRpc(CallStats stats) {
145      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
146      this.reqHist.update(stats.getRequestSizeBytes());
147      this.respHist.update(stats.getResponseSizeBytes());
148    }
149
150    @Override
151    public String toString() {
152      return "CallTracker:" + name;
153    }
154  }
155
156  protected static class RegionStats {
157    final String name;
158    final Histogram memstoreLoadHist;
159    final Histogram heapOccupancyHist;
160
161    public RegionStats(MetricRegistry registry, String name) {
162      this.name = name;
163      this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class,
164          MEMLOAD_BASE + this.name));
165      this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class,
166          HEAP_BASE + this.name));
167    }
168
169    public void update(RegionLoadStats regionStatistics) {
170      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
171      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
172    }
173  }
174
175  @VisibleForTesting
176  protected static class RunnerStats {
177    final Counter normalRunners;
178    final Counter delayRunners;
179    final Histogram delayIntevalHist;
180
181    public RunnerStats(MetricRegistry registry) {
182      this.normalRunners = registry.counter(
183        name(MetricsConnection.class, "normalRunnersCount"));
184      this.delayRunners = registry.counter(
185        name(MetricsConnection.class, "delayRunnersCount"));
186      this.delayIntevalHist = registry.histogram(
187        name(MetricsConnection.class, "delayIntervalHist"));
188    }
189
190    public void incrNormalRunners() {
191      this.normalRunners.inc();
192    }
193
194    public void incrDelayRunners() {
195      this.delayRunners.inc();
196    }
197
198    public void updateDelayInterval(long interval) {
199      this.delayIntevalHist.update(interval);
200    }
201  }
202
203  @VisibleForTesting
204  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
205          = new ConcurrentHashMap<>();
206
207  public void updateServerStats(ServerName serverName, byte[] regionName,
208                                Object r) {
209    if (!(r instanceof Result)) {
210      return;
211    }
212    Result result = (Result) r;
213    RegionLoadStats stats = result.getStats();
214    if (stats == null) {
215      return;
216    }
217    updateRegionStats(serverName, regionName, stats);
218  }
219
220  @Override
221  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
222    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
223    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
224      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
225    RegionStats regionStats =
226        computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
227    regionStats.update(stats);
228  }
229
230  /** A lambda for dispatching to the appropriate metric factory method */
231  private static interface NewMetric<T> {
232    T newMetric(Class<?> clazz, String name, String scope);
233  }
234
235  /** Anticipated number of metric entries */
236  private static final int CAPACITY = 50;
237  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
238  private static final float LOAD_FACTOR = 0.75f;
239  /**
240   * Anticipated number of concurrent accessor threads, from
241   * {@link ConnectionImplementation#getBatchPool()}
242   */
243  private static final int CONCURRENCY_LEVEL = 256;
244
245  private final MetricRegistry registry;
246  private final JmxReporter reporter;
247  private final String scope;
248
249  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
250    @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
251      return registry.timer(name(clazz, name, scope));
252    }
253  };
254
255  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
256    @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
257      return registry.histogram(name(clazz, name, scope));
258    }
259  };
260
261  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
262    @Override public Counter newMetric(Class<?> clazz, String name, String scope) {
263      return registry.counter(name(clazz, name, scope));
264    }
265  };
266
267  // static metrics
268
269  @VisibleForTesting protected final Counter metaCacheHits;
270  @VisibleForTesting protected final Counter metaCacheMisses;
271  @VisibleForTesting protected final CallTracker getTracker;
272  @VisibleForTesting protected final CallTracker scanTracker;
273  @VisibleForTesting protected final CallTracker appendTracker;
274  @VisibleForTesting protected final CallTracker deleteTracker;
275  @VisibleForTesting protected final CallTracker incrementTracker;
276  @VisibleForTesting protected final CallTracker putTracker;
277  @VisibleForTesting protected final CallTracker multiTracker;
278  @VisibleForTesting protected final RunnerStats runnerStats;
279  @VisibleForTesting protected final Counter metaCacheNumClearServer;
280  @VisibleForTesting protected final Counter metaCacheNumClearRegion;
281  @VisibleForTesting protected final Counter hedgedReadOps;
282  @VisibleForTesting protected final Counter hedgedReadWin;
283  @VisibleForTesting protected final Histogram concurrentCallsPerServerHist;
284
285  // dynamic metrics
286
287  // These maps are used to cache references to the metric instances that are managed by the
288  // registry. I don't think their use perfectly removes redundant allocations, but it's
289  // a big improvement over calling registry.newMetric each time.
290  @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
291      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
292  @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
293      new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
294          LOAD_FACTOR, CONCURRENCY_LEVEL);
295  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
296    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
297
298  MetricsConnection(final ConnectionImplementation conn) {
299    this.scope = conn.toString();
300    this.registry = new MetricRegistry();
301
302    this.registry.register(getExecutorPoolName(),
303        new RatioGauge() {
304          @Override
305          protected Ratio getRatio() {
306            ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
307            if (batchPool == null) {
308              return Ratio.of(0, 0);
309            }
310            return Ratio.of(batchPool.getActiveCount(), batchPool.getMaximumPoolSize());
311          }
312        });
313    this.registry.register(getMetaPoolName(),
314        new RatioGauge() {
315          @Override
316          protected Ratio getRatio() {
317            ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
318            if (metaPool == null) {
319              return Ratio.of(0, 0);
320            }
321            return Ratio.of(metaPool.getActiveCount(), metaPool.getMaximumPoolSize());
322          }
323        });
324    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
325    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
326    this.metaCacheNumClearServer = registry.counter(name(this.getClass(),
327      "metaCacheNumClearServer", scope));
328    this.metaCacheNumClearRegion = registry.counter(name(this.getClass(),
329      "metaCacheNumClearRegion", scope));
330    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
331    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
332    this.getTracker = new CallTracker(this.registry, "Get", scope);
333    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
334    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
335    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
336    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
337    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
338    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
339    this.runnerStats = new RunnerStats(this.registry);
340    this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 
341      "concurrentCallsPerServer", scope));
342
343    this.reporter = JmxReporter.forRegistry(this.registry).build();
344    this.reporter.start();
345  }
346
347  @VisibleForTesting
348  final String getExecutorPoolName() {
349    return name(getClass(), "executorPoolActiveThreads", scope);
350  }
351
352  @VisibleForTesting
353  final String getMetaPoolName() {
354    return name(getClass(), "metaPoolActiveThreads", scope);
355  }
356
357  @VisibleForTesting
358  MetricRegistry getMetricRegistry() {
359    return registry;
360  }
361
362  public void shutdown() {
363    this.reporter.stop();
364  }
365
366  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
367  public static CallStats newCallStats() {
368    // TODO: instance pool to reduce GC?
369    return new CallStats();
370  }
371
372  /** Increment the number of meta cache hits. */
373  public void incrMetaCacheHit() {
374    metaCacheHits.inc();
375  }
376
377  /** Increment the number of meta cache misses. */
378  public void incrMetaCacheMiss() {
379    metaCacheMisses.inc();
380  }
381
382  /** Increment the number of meta cache drops requested for entire RegionServer. */
383  public void incrMetaCacheNumClearServer() {
384    metaCacheNumClearServer.inc();
385  }
386
387  /** Increment the number of meta cache drops requested for individual region. */
388  public void incrMetaCacheNumClearRegion() {
389    metaCacheNumClearRegion.inc();
390  }
391
392  /** Increment the number of hedged read that have occurred. */
393  public void incrHedgedReadOps() {
394    hedgedReadOps.inc();
395  }
396
397  /** Increment the number of hedged read returned faster than the original read. */
398  public void incrHedgedReadWin() {
399    hedgedReadWin.inc();
400  }
401
402  /** Increment the number of normal runner counts. */
403  public void incrNormalRunners() {
404    this.runnerStats.incrNormalRunners();
405  }
406
407  /** Increment the number of delay runner counts. */
408  public void incrDelayRunners() {
409    this.runnerStats.incrDelayRunners();
410  }
411
412  /** Update delay interval of delay runner. */
413  public void updateDelayInterval(long interval) {
414    this.runnerStats.updateDelayInterval(interval);
415  }
416
417  /**
418   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
419   */
420  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
421    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
422  }
423
424  /** Update call stats for non-critical-path methods */
425  private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
426    final String methodName = method.getService().getName() + "_" + method.getName();
427    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
428        .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
429    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
430        .update(stats.getRequestSizeBytes());
431    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
432        .update(stats.getResponseSizeBytes());
433  }
434
435  /** Report RPC context to metrics system. */
436  public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
437    int callsPerServer = stats.getConcurrentCallsPerServer();
438    if (callsPerServer > 0) {
439      concurrentCallsPerServerHist.update(callsPerServer);
440    }
441    // this implementation is tied directly to protobuf implementation details. would be better
442    // if we could dispatch based on something static, ie, request Message type.
443    if (method.getService() == ClientService.getDescriptor()) {
444      switch(method.getIndex()) {
445      case 0:
446        assert "Get".equals(method.getName());
447        getTracker.updateRpc(stats);
448        return;
449      case 1:
450        assert "Mutate".equals(method.getName());
451        final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
452        switch(mutationType) {
453        case APPEND:
454          appendTracker.updateRpc(stats);
455          return;
456        case DELETE:
457          deleteTracker.updateRpc(stats);
458          return;
459        case INCREMENT:
460          incrementTracker.updateRpc(stats);
461          return;
462        case PUT:
463          putTracker.updateRpc(stats);
464          return;
465        default:
466          throw new RuntimeException("Unrecognized mutation type " + mutationType);
467        }
468      case 2:
469        assert "Scan".equals(method.getName());
470        scanTracker.updateRpc(stats);
471        return;
472      case 3:
473        assert "BulkLoadHFile".equals(method.getName());
474        // use generic implementation
475        break;
476      case 4:
477        assert "PrepareBulkLoad".equals(method.getName());
478        // use generic implementation
479        break;
480      case 5:
481        assert "CleanupBulkLoad".equals(method.getName());
482        // use generic implementation
483        break;
484      case 6:
485        assert "ExecService".equals(method.getName());
486        // use generic implementation
487        break;
488      case 7:
489        assert "ExecRegionServerService".equals(method.getName());
490        // use generic implementation
491        break;
492      case 8:
493        assert "Multi".equals(method.getName());
494        multiTracker.updateRpc(stats);
495        return;
496      default:
497        throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
498      }
499    }
500    // Fallback to dynamic registry lookup for DDL methods.
501    updateRpcGeneric(method, stats);
502  }
503
504  public void incrCacheDroppingExceptions(Object exception) {
505    getMetric(CACHE_BASE +
506      (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
507      cacheDroppingExceptions, counterFactory).inc();
508  }
509}