View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import com.google.protobuf.Descriptors.MethodDescriptor;
22  import com.google.protobuf.Message;
23  import com.yammer.metrics.core.Counter;
24  import com.yammer.metrics.core.Histogram;
25  import com.yammer.metrics.core.MetricsRegistry;
26  import com.yammer.metrics.core.Timer;
27  import com.yammer.metrics.reporting.JmxReporter;
28  import com.yammer.metrics.util.RatioGauge;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
32  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
33  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
34  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
35  import org.apache.hadoop.hbase.util.Bytes;
36  
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ConcurrentSkipListMap;
39  import java.util.concurrent.ConcurrentMap;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  
43  /**
44   * This class is for maintaining the various connection statistics and publishing them through
45   * the metrics interfaces.
46   *
47   * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not
48   * conflict with other uses of Yammer Metrics within the client application. Instantiating
49   * this class implicitly creates and "starts" instances of these classes; be sure to call
50   * {@link #shutdown()} to terminate the thread pools they allocate.
51   */
52  @InterfaceAudience.Private
53  public class MetricsConnection {
54  
55    /** Set this key to {@code true} to enable metrics collection of client requests. */
56    public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
57  
58    private static final String DRTN_BASE = "rpcCallDurationMs_";
59    private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
60    private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
61    private static final String MEMLOAD_BASE = "memstoreLoad_";
62    private static final String HEAP_BASE = "heapOccupancy_";
63    private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
64  
65    /** A container class for collecting details about the RPC call as it percolates. */
66    public static class CallStats {
67      private long requestSizeBytes = 0;
68      private long responseSizeBytes = 0;
69      private long startTime = 0;
70      private long callTimeMs = 0;
71  
72      public long getRequestSizeBytes() {
73        return requestSizeBytes;
74      }
75  
76      public void setRequestSizeBytes(long requestSizeBytes) {
77        this.requestSizeBytes = requestSizeBytes;
78      }
79  
80      public long getResponseSizeBytes() {
81        return responseSizeBytes;
82      }
83  
84      public void setResponseSizeBytes(long responseSizeBytes) {
85        this.responseSizeBytes = responseSizeBytes;
86      }
87  
88      public long getStartTime() {
89        return startTime;
90      }
91  
92      public void setStartTime(long startTime) {
93        this.startTime = startTime;
94      }
95  
96      public long getCallTimeMs() {
97        return callTimeMs;
98      }
99  
100     public void setCallTimeMs(long callTimeMs) {
101       this.callTimeMs = callTimeMs;
102     }
103   }
104 
105   @VisibleForTesting
106   protected static final class CallTracker {
107     private final String name;
108     @VisibleForTesting final Timer callTimer;
109     @VisibleForTesting final Histogram reqHist;
110     @VisibleForTesting final Histogram respHist;
111 
112     private CallTracker(MetricsRegistry registry, String name, String subName, String scope) {
113       StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
114       if (subName != null) {
115         sb.append("(").append(subName).append(")");
116       }
117       this.name = sb.toString();
118       this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope);
119       this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope);
120       this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope);
121     }
122 
123     private CallTracker(MetricsRegistry registry, String name, String scope) {
124       this(registry, name, null, scope);
125     }
126 
127     public void updateRpc(CallStats stats) {
128       this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
129       this.reqHist.update(stats.getRequestSizeBytes());
130       this.respHist.update(stats.getResponseSizeBytes());
131     }
132 
133     @Override
134     public String toString() {
135       return "CallTracker:" + name;
136     }
137   }
138 
139   protected static class RegionStats {
140     final String name;
141     final Histogram memstoreLoadHist;
142     final Histogram heapOccupancyHist;
143 
144     public RegionStats(MetricsRegistry registry, String name) {
145       this.name = name;
146       this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
147           MEMLOAD_BASE + this.name);
148       this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
149           HEAP_BASE + this.name);
150     }
151 
152     public void update(ClientProtos.RegionLoadStats regionStatistics) {
153       this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
154       this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
155     }
156   }
157 
158   @VisibleForTesting
159   protected static class RunnerStats {
160     final Counter normalRunners;
161     final Counter delayRunners;
162     final Histogram delayIntevalHist;
163 
164     public RunnerStats(MetricsRegistry registry) {
165       this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount");
166       this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount");
167       this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist");
168     }
169 
170     public void incrNormalRunners() {
171       this.normalRunners.inc();
172     }
173 
174     public void incrDelayRunners() {
175       this.delayRunners.inc();
176     }
177 
178     public void updateDelayInterval(long interval) {
179       this.delayIntevalHist.update(interval);
180     }
181   }
182 
183   @VisibleForTesting
184   protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
185           = new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>();
186 
187   public void updateServerStats(ServerName serverName, byte[] regionName,
188                                 Object r) {
189     if (!(r instanceof Result)) {
190       return;
191     }
192     Result result = (Result) r;
193     ClientProtos.RegionLoadStats stats = result.getStats();
194     if(stats == null){
195       return;
196     }
197     String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
198     ConcurrentMap<byte[], RegionStats> rsStats = null;
199     if (serverStats.containsKey(serverName)) {
200       rsStats = serverStats.get(serverName);
201     } else {
202       rsStats = serverStats.putIfAbsent(serverName,
203           new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
204       if (rsStats == null) {
205         rsStats = serverStats.get(serverName);
206       }
207     }
208     RegionStats regionStats = null;
209     if (rsStats.containsKey(regionName)) {
210       regionStats = rsStats.get(regionName);
211     } else {
212       regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
213       if (regionStats == null) {
214         regionStats = rsStats.get(regionName);
215       }
216     }
217     regionStats.update(stats);
218   }
219 
220 
221   /** A lambda for dispatching to the appropriate metric factory method */
222   private static interface NewMetric<T> {
223     T newMetric(Class<?> clazz, String name, String scope);
224   }
225 
226   /** Anticipated number of metric entries */
227   private static final int CAPACITY = 50;
228   /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
229   private static final float LOAD_FACTOR = 0.75f;
230   /**
231    * Anticipated number of concurrent accessor threads, from
232    * {@link ConnectionManager.HConnectionImplementation#getBatchPool()}
233    */
234   private static final int CONCURRENCY_LEVEL = 256;
235 
236   private final MetricsRegistry registry;
237   private final JmxReporter reporter;
238   private final String scope;
239 
240   private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
241     @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
242       return registry.newTimer(clazz, name, scope);
243     }
244   };
245 
246   private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
247     @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
248       return registry.newHistogram(clazz, name, scope);
249     }
250   };
251 
252   // static metrics
253 
254   @VisibleForTesting protected final Counter metaCacheHits;
255   @VisibleForTesting protected final Counter metaCacheMisses;
256   @VisibleForTesting protected final CallTracker getTracker;
257   @VisibleForTesting protected final CallTracker scanTracker;
258   @VisibleForTesting protected final CallTracker appendTracker;
259   @VisibleForTesting protected final CallTracker deleteTracker;
260   @VisibleForTesting protected final CallTracker incrementTracker;
261   @VisibleForTesting protected final CallTracker putTracker;
262   @VisibleForTesting protected final CallTracker multiTracker;
263   @VisibleForTesting protected final RunnerStats runnerStats;
264 
265   // dynamic metrics
266 
267   // These maps are used to cache references to the metric instances that are managed by the
268   // registry. I don't think their use perfectly removes redundant allocations, but it's
269   // a big improvement over calling registry.newMetric each time.
270   @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
271       new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
272   @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
273       new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
274           LOAD_FACTOR, CONCURRENCY_LEVEL);
275 
276   public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) {
277     this.scope = conn.toString();
278     this.registry = new MetricsRegistry();
279     final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
280     final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
281 
282     this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
283         new RatioGauge() {
284           @Override protected double getNumerator() {
285             return batchPool.getActiveCount();
286           }
287           @Override protected double getDenominator() {
288             return batchPool.getMaximumPoolSize();
289           }
290         });
291     this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
292         new RatioGauge() {
293           @Override protected double getNumerator() {
294             return metaPool.getActiveCount();
295           }
296           @Override protected double getDenominator() {
297             return metaPool.getMaximumPoolSize();
298           }
299         });
300     this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
301     this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
302     this.getTracker = new CallTracker(this.registry, "Get", scope);
303     this.scanTracker = new CallTracker(this.registry, "Scan", scope);
304     this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
305     this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
306     this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
307     this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
308     this.multiTracker = new CallTracker(this.registry, "Multi", scope);
309     this.runnerStats = new RunnerStats(this.registry);
310 
311     this.reporter = new JmxReporter(this.registry);
312     this.reporter.start();
313   }
314 
315   public void shutdown() {
316     this.reporter.shutdown();
317     this.registry.shutdown();
318   }
319 
320   /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
321   public static CallStats newCallStats() {
322     // TODO: instance pool to reduce GC?
323     return new CallStats();
324   }
325 
326   /** Increment the number of meta cache hits. */
327   public void incrMetaCacheHit() {
328     metaCacheHits.inc();
329   }
330 
331   /** Increment the number of meta cache misses. */
332   public void incrMetaCacheMiss() {
333     metaCacheMisses.inc();
334   }
335 
336   /** Increment the number of normal runner counts. */
337   public void incrNormalRunners() {
338     this.runnerStats.incrNormalRunners();
339   }
340 
341   /** Increment the number of delay runner counts. */
342   public void incrDelayRunners() {
343     this.runnerStats.incrDelayRunners();
344   }
345 
346   /** Update delay interval of delay runner. */
347   public void updateDelayInterval(long interval) {
348     this.runnerStats.updateDelayInterval(interval);
349   }
350 
351   /**
352    * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
353    */
354   private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
355     T t = map.get(key);
356     if (t == null) {
357       t = factory.newMetric(this.getClass(), key, scope);
358       map.putIfAbsent(key, t);
359     }
360     return t;
361   }
362 
363   /** Update call stats for non-critical-path methods */
364   private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
365     final String methodName = method.getService().getName() + "_" + method.getName();
366     getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
367         .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
368     getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
369         .update(stats.getRequestSizeBytes());
370     getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
371         .update(stats.getResponseSizeBytes());
372   }
373 
374   /** Report RPC context to metrics system. */
375   public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
376     // this implementation is tied directly to protobuf implementation details. would be better
377     // if we could dispatch based on something static, ie, request Message type.
378     if (method.getService() == ClientService.getDescriptor()) {
379       switch(method.getIndex()) {
380       case 0:
381         assert "Get".equals(method.getName());
382         getTracker.updateRpc(stats);
383         return;
384       case 1:
385         assert "Mutate".equals(method.getName());
386         final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
387         switch(mutationType) {
388         case APPEND:
389           appendTracker.updateRpc(stats);
390           return;
391         case DELETE:
392           deleteTracker.updateRpc(stats);
393           return;
394         case INCREMENT:
395           incrementTracker.updateRpc(stats);
396           return;
397         case PUT:
398           putTracker.updateRpc(stats);
399           return;
400         default:
401           throw new RuntimeException("Unrecognized mutation type " + mutationType);
402         }
403       case 2:
404         assert "Scan".equals(method.getName());
405         scanTracker.updateRpc(stats);
406         return;
407       case 3:
408         assert "BulkLoadHFile".equals(method.getName());
409         // use generic implementation
410         break;
411       case 4:
412         assert "ExecService".equals(method.getName());
413         // use generic implementation
414         break;
415       case 5:
416         assert "ExecRegionServerService".equals(method.getName());
417         // use generic implementation
418         break;
419       case 6:
420         assert "Multi".equals(method.getName());
421         multiTracker.updateRpc(stats);
422         return;
423       default:
424         throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
425       }
426     }
427     // Fallback to dynamic registry lookup for DDL methods.
428     updateRpcGeneric(method, stats);
429   }
430 }