1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.protobuf.Descriptors.MethodDescriptor;
22 import com.google.protobuf.Message;
23 import com.yammer.metrics.core.Counter;
24 import com.yammer.metrics.core.Histogram;
25 import com.yammer.metrics.core.MetricsRegistry;
26 import com.yammer.metrics.core.Timer;
27 import com.yammer.metrics.reporting.JmxReporter;
28 import com.yammer.metrics.util.RatioGauge;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
32 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
33 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
35 import org.apache.hadoop.hbase.util.Bytes;
36
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentSkipListMap;
39 import java.util.concurrent.ConcurrentMap;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42
43
44
45
46
47
48
49
50
51
52 @InterfaceAudience.Private
53 public class MetricsConnection {
54
55
56 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
57
58 private static final String DRTN_BASE = "rpcCallDurationMs_";
59 private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
60 private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
61 private static final String MEMLOAD_BASE = "memstoreLoad_";
62 private static final String HEAP_BASE = "heapOccupancy_";
63 private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
64
65
66 public static class CallStats {
67 private long requestSizeBytes = 0;
68 private long responseSizeBytes = 0;
69 private long startTime = 0;
70 private long callTimeMs = 0;
71
72 public long getRequestSizeBytes() {
73 return requestSizeBytes;
74 }
75
76 public void setRequestSizeBytes(long requestSizeBytes) {
77 this.requestSizeBytes = requestSizeBytes;
78 }
79
80 public long getResponseSizeBytes() {
81 return responseSizeBytes;
82 }
83
84 public void setResponseSizeBytes(long responseSizeBytes) {
85 this.responseSizeBytes = responseSizeBytes;
86 }
87
88 public long getStartTime() {
89 return startTime;
90 }
91
92 public void setStartTime(long startTime) {
93 this.startTime = startTime;
94 }
95
96 public long getCallTimeMs() {
97 return callTimeMs;
98 }
99
100 public void setCallTimeMs(long callTimeMs) {
101 this.callTimeMs = callTimeMs;
102 }
103 }
104
105 @VisibleForTesting
106 protected static final class CallTracker {
107 private final String name;
108 @VisibleForTesting final Timer callTimer;
109 @VisibleForTesting final Histogram reqHist;
110 @VisibleForTesting final Histogram respHist;
111
112 private CallTracker(MetricsRegistry registry, String name, String subName, String scope) {
113 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
114 if (subName != null) {
115 sb.append("(").append(subName).append(")");
116 }
117 this.name = sb.toString();
118 this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope);
119 this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope);
120 this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope);
121 }
122
123 private CallTracker(MetricsRegistry registry, String name, String scope) {
124 this(registry, name, null, scope);
125 }
126
127 public void updateRpc(CallStats stats) {
128 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
129 this.reqHist.update(stats.getRequestSizeBytes());
130 this.respHist.update(stats.getResponseSizeBytes());
131 }
132
133 @Override
134 public String toString() {
135 return "CallTracker:" + name;
136 }
137 }
138
139 protected static class RegionStats {
140 final String name;
141 final Histogram memstoreLoadHist;
142 final Histogram heapOccupancyHist;
143
144 public RegionStats(MetricsRegistry registry, String name) {
145 this.name = name;
146 this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
147 MEMLOAD_BASE + this.name);
148 this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
149 HEAP_BASE + this.name);
150 }
151
152 public void update(ClientProtos.RegionLoadStats regionStatistics) {
153 this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
154 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
155 }
156 }
157
158 @VisibleForTesting
159 protected static class RunnerStats {
160 final Counter normalRunners;
161 final Counter delayRunners;
162 final Histogram delayIntevalHist;
163
164 public RunnerStats(MetricsRegistry registry) {
165 this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount");
166 this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount");
167 this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist");
168 }
169
170 public void incrNormalRunners() {
171 this.normalRunners.inc();
172 }
173
174 public void incrDelayRunners() {
175 this.delayRunners.inc();
176 }
177
178 public void updateDelayInterval(long interval) {
179 this.delayIntevalHist.update(interval);
180 }
181 }
182
183 @VisibleForTesting
184 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
185 = new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>();
186
187 public void updateServerStats(ServerName serverName, byte[] regionName,
188 Object r) {
189 if (!(r instanceof Result)) {
190 return;
191 }
192 Result result = (Result) r;
193 ClientProtos.RegionLoadStats stats = result.getStats();
194 if(stats == null){
195 return;
196 }
197 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
198 ConcurrentMap<byte[], RegionStats> rsStats = null;
199 if (serverStats.containsKey(serverName)) {
200 rsStats = serverStats.get(serverName);
201 } else {
202 rsStats = serverStats.putIfAbsent(serverName,
203 new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
204 if (rsStats == null) {
205 rsStats = serverStats.get(serverName);
206 }
207 }
208 RegionStats regionStats = null;
209 if (rsStats.containsKey(regionName)) {
210 regionStats = rsStats.get(regionName);
211 } else {
212 regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
213 if (regionStats == null) {
214 regionStats = rsStats.get(regionName);
215 }
216 }
217 regionStats.update(stats);
218 }
219
220
221
222 private static interface NewMetric<T> {
223 T newMetric(Class<?> clazz, String name, String scope);
224 }
225
226
227 private static final int CAPACITY = 50;
228
229 private static final float LOAD_FACTOR = 0.75f;
230
231
232
233
234 private static final int CONCURRENCY_LEVEL = 256;
235
236 private final MetricsRegistry registry;
237 private final JmxReporter reporter;
238 private final String scope;
239
240 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
241 @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
242 return registry.newTimer(clazz, name, scope);
243 }
244 };
245
246 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
247 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
248 return registry.newHistogram(clazz, name, scope);
249 }
250 };
251
252
253
254 @VisibleForTesting protected final Counter metaCacheHits;
255 @VisibleForTesting protected final Counter metaCacheMisses;
256 @VisibleForTesting protected final CallTracker getTracker;
257 @VisibleForTesting protected final CallTracker scanTracker;
258 @VisibleForTesting protected final CallTracker appendTracker;
259 @VisibleForTesting protected final CallTracker deleteTracker;
260 @VisibleForTesting protected final CallTracker incrementTracker;
261 @VisibleForTesting protected final CallTracker putTracker;
262 @VisibleForTesting protected final CallTracker multiTracker;
263 @VisibleForTesting protected final RunnerStats runnerStats;
264
265
266
267
268
269
270 @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
271 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
272 @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
273 new ConcurrentHashMap<>(CAPACITY * 2
274 LOAD_FACTOR, CONCURRENCY_LEVEL);
275
276 public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) {
277 this.scope = conn.toString();
278 this.registry = new MetricsRegistry();
279 final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
280 final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
281
282 this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
283 new RatioGauge() {
284 @Override protected double getNumerator() {
285 return batchPool.getActiveCount();
286 }
287 @Override protected double getDenominator() {
288 return batchPool.getMaximumPoolSize();
289 }
290 });
291 this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
292 new RatioGauge() {
293 @Override protected double getNumerator() {
294 return metaPool.getActiveCount();
295 }
296 @Override protected double getDenominator() {
297 return metaPool.getMaximumPoolSize();
298 }
299 });
300 this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
301 this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
302 this.getTracker = new CallTracker(this.registry, "Get", scope);
303 this.scanTracker = new CallTracker(this.registry, "Scan", scope);
304 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
305 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
306 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
307 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
308 this.multiTracker = new CallTracker(this.registry, "Multi", scope);
309 this.runnerStats = new RunnerStats(this.registry);
310
311 this.reporter = new JmxReporter(this.registry);
312 this.reporter.start();
313 }
314
315 public void shutdown() {
316 this.reporter.shutdown();
317 this.registry.shutdown();
318 }
319
320
321 public static CallStats newCallStats() {
322
323 return new CallStats();
324 }
325
326
327 public void incrMetaCacheHit() {
328 metaCacheHits.inc();
329 }
330
331
332 public void incrMetaCacheMiss() {
333 metaCacheMisses.inc();
334 }
335
336
337 public void incrNormalRunners() {
338 this.runnerStats.incrNormalRunners();
339 }
340
341
342 public void incrDelayRunners() {
343 this.runnerStats.incrDelayRunners();
344 }
345
346
347 public void updateDelayInterval(long interval) {
348 this.runnerStats.updateDelayInterval(interval);
349 }
350
351
352
353
354 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
355 T t = map.get(key);
356 if (t == null) {
357 t = factory.newMetric(this.getClass(), key, scope);
358 map.putIfAbsent(key, t);
359 }
360 return t;
361 }
362
363
364 private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
365 final String methodName = method.getService().getName() + "_" + method.getName();
366 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
367 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
368 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
369 .update(stats.getRequestSizeBytes());
370 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
371 .update(stats.getResponseSizeBytes());
372 }
373
374
375 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
376
377
378 if (method.getService() == ClientService.getDescriptor()) {
379 switch(method.getIndex()) {
380 case 0:
381 assert "Get".equals(method.getName());
382 getTracker.updateRpc(stats);
383 return;
384 case 1:
385 assert "Mutate".equals(method.getName());
386 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
387 switch(mutationType) {
388 case APPEND:
389 appendTracker.updateRpc(stats);
390 return;
391 case DELETE:
392 deleteTracker.updateRpc(stats);
393 return;
394 case INCREMENT:
395 incrementTracker.updateRpc(stats);
396 return;
397 case PUT:
398 putTracker.updateRpc(stats);
399 return;
400 default:
401 throw new RuntimeException("Unrecognized mutation type " + mutationType);
402 }
403 case 2:
404 assert "Scan".equals(method.getName());
405 scanTracker.updateRpc(stats);
406 return;
407 case 3:
408 assert "BulkLoadHFile".equals(method.getName());
409
410 break;
411 case 4:
412 assert "ExecService".equals(method.getName());
413
414 break;
415 case 5:
416 assert "ExecRegionServerService".equals(method.getName());
417
418 break;
419 case 6:
420 assert "Multi".equals(method.getName());
421 multiTracker.updateRpc(stats);
422 return;
423 default:
424 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
425 }
426 }
427
428 updateRpcGeneric(method, stats);
429 }
430 }