001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 040import org.apache.hbase.thirdparty.com.google.protobuf.Message; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 044import org.apache.hadoop.hbase.util.Bytes; 045 046/** 047 * This class is for maintaining the various connection statistics and publishing them through 048 * the metrics interfaces. 049 * 050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not 051 * conflict with other uses of Yammer Metrics within the client application. Instantiating 052 * this class implicitly creates and "starts" instances of these classes; be sure to call 053 * {@link #shutdown()} to terminate the thread pools they allocate. 054 */ 055@InterfaceAudience.Private 056public class MetricsConnection implements StatisticTrackable { 057 058 /** Set this key to {@code true} to enable metrics collection of client requests. */ 059 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 060 061 private static final String DRTN_BASE = "rpcCallDurationMs_"; 062 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 063 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 064 private static final String MEMLOAD_BASE = "memstoreLoad_"; 065 private static final String HEAP_BASE = "heapOccupancy_"; 066 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 067 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 068 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 069 070 /** A container class for collecting details about the RPC call as it percolates. */ 071 public static class CallStats { 072 private long requestSizeBytes = 0; 073 private long responseSizeBytes = 0; 074 private long startTime = 0; 075 private long callTimeMs = 0; 076 private int concurrentCallsPerServer = 0; 077 078 public long getRequestSizeBytes() { 079 return requestSizeBytes; 080 } 081 082 public void setRequestSizeBytes(long requestSizeBytes) { 083 this.requestSizeBytes = requestSizeBytes; 084 } 085 086 public long getResponseSizeBytes() { 087 return responseSizeBytes; 088 } 089 090 public void setResponseSizeBytes(long responseSizeBytes) { 091 this.responseSizeBytes = responseSizeBytes; 092 } 093 094 public long getStartTime() { 095 return startTime; 096 } 097 098 public void setStartTime(long startTime) { 099 this.startTime = startTime; 100 } 101 102 public long getCallTimeMs() { 103 return callTimeMs; 104 } 105 106 public void setCallTimeMs(long callTimeMs) { 107 this.callTimeMs = callTimeMs; 108 } 109 110 public int getConcurrentCallsPerServer() { 111 return concurrentCallsPerServer; 112 } 113 114 public void setConcurrentCallsPerServer(int callsPerServer) { 115 this.concurrentCallsPerServer = callsPerServer; 116 } 117 } 118 119 @VisibleForTesting 120 protected static final class CallTracker { 121 private final String name; 122 @VisibleForTesting final Timer callTimer; 123 @VisibleForTesting final Histogram reqHist; 124 @VisibleForTesting final Histogram respHist; 125 126 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 127 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 128 if (subName != null) { 129 sb.append("(").append(subName).append(")"); 130 } 131 this.name = sb.toString(); 132 this.callTimer = registry.timer(name(MetricsConnection.class, 133 DRTN_BASE + this.name, scope)); 134 this.reqHist = registry.histogram(name(MetricsConnection.class, 135 REQ_BASE + this.name, scope)); 136 this.respHist = registry.histogram(name(MetricsConnection.class, 137 RESP_BASE + this.name, scope)); 138 } 139 140 private CallTracker(MetricRegistry registry, String name, String scope) { 141 this(registry, name, null, scope); 142 } 143 144 public void updateRpc(CallStats stats) { 145 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 146 this.reqHist.update(stats.getRequestSizeBytes()); 147 this.respHist.update(stats.getResponseSizeBytes()); 148 } 149 150 @Override 151 public String toString() { 152 return "CallTracker:" + name; 153 } 154 } 155 156 protected static class RegionStats { 157 final String name; 158 final Histogram memstoreLoadHist; 159 final Histogram heapOccupancyHist; 160 161 public RegionStats(MetricRegistry registry, String name) { 162 this.name = name; 163 this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class, 164 MEMLOAD_BASE + this.name)); 165 this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class, 166 HEAP_BASE + this.name)); 167 } 168 169 public void update(RegionLoadStats regionStatistics) { 170 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 171 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 172 } 173 } 174 175 @VisibleForTesting 176 protected static class RunnerStats { 177 final Counter normalRunners; 178 final Counter delayRunners; 179 final Histogram delayIntevalHist; 180 181 public RunnerStats(MetricRegistry registry) { 182 this.normalRunners = registry.counter( 183 name(MetricsConnection.class, "normalRunnersCount")); 184 this.delayRunners = registry.counter( 185 name(MetricsConnection.class, "delayRunnersCount")); 186 this.delayIntevalHist = registry.histogram( 187 name(MetricsConnection.class, "delayIntervalHist")); 188 } 189 190 public void incrNormalRunners() { 191 this.normalRunners.inc(); 192 } 193 194 public void incrDelayRunners() { 195 this.delayRunners.inc(); 196 } 197 198 public void updateDelayInterval(long interval) { 199 this.delayIntevalHist.update(interval); 200 } 201 } 202 203 @VisibleForTesting 204 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats 205 = new ConcurrentHashMap<>(); 206 207 public void updateServerStats(ServerName serverName, byte[] regionName, 208 Object r) { 209 if (!(r instanceof Result)) { 210 return; 211 } 212 Result result = (Result) r; 213 RegionLoadStats stats = result.getStats(); 214 if (stats == null) { 215 return; 216 } 217 updateRegionStats(serverName, regionName, stats); 218 } 219 220 @Override 221 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 222 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 223 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 224 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 225 RegionStats regionStats = 226 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 227 regionStats.update(stats); 228 } 229 230 /** A lambda for dispatching to the appropriate metric factory method */ 231 private static interface NewMetric<T> { 232 T newMetric(Class<?> clazz, String name, String scope); 233 } 234 235 /** Anticipated number of metric entries */ 236 private static final int CAPACITY = 50; 237 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 238 private static final float LOAD_FACTOR = 0.75f; 239 /** 240 * Anticipated number of concurrent accessor threads, from 241 * {@link ConnectionImplementation#getBatchPool()} 242 */ 243 private static final int CONCURRENCY_LEVEL = 256; 244 245 private final MetricRegistry registry; 246 private final JmxReporter reporter; 247 private final String scope; 248 249 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 250 @Override public Timer newMetric(Class<?> clazz, String name, String scope) { 251 return registry.timer(name(clazz, name, scope)); 252 } 253 }; 254 255 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 256 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) { 257 return registry.histogram(name(clazz, name, scope)); 258 } 259 }; 260 261 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 262 @Override public Counter newMetric(Class<?> clazz, String name, String scope) { 263 return registry.counter(name(clazz, name, scope)); 264 } 265 }; 266 267 // static metrics 268 269 @VisibleForTesting protected final Counter metaCacheHits; 270 @VisibleForTesting protected final Counter metaCacheMisses; 271 @VisibleForTesting protected final CallTracker getTracker; 272 @VisibleForTesting protected final CallTracker scanTracker; 273 @VisibleForTesting protected final CallTracker appendTracker; 274 @VisibleForTesting protected final CallTracker deleteTracker; 275 @VisibleForTesting protected final CallTracker incrementTracker; 276 @VisibleForTesting protected final CallTracker putTracker; 277 @VisibleForTesting protected final CallTracker multiTracker; 278 @VisibleForTesting protected final RunnerStats runnerStats; 279 @VisibleForTesting protected final Counter metaCacheNumClearServer; 280 @VisibleForTesting protected final Counter metaCacheNumClearRegion; 281 @VisibleForTesting protected final Counter hedgedReadOps; 282 @VisibleForTesting protected final Counter hedgedReadWin; 283 @VisibleForTesting protected final Histogram concurrentCallsPerServerHist; 284 285 // dynamic metrics 286 287 // These maps are used to cache references to the metric instances that are managed by the 288 // registry. I don't think their use perfectly removes redundant allocations, but it's 289 // a big improvement over calling registry.newMetric each time. 290 @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers = 291 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 292 @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms = 293 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, 294 LOAD_FACTOR, CONCURRENCY_LEVEL); 295 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 296 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 297 298 MetricsConnection(final ConnectionImplementation conn) { 299 this.scope = conn.toString(); 300 this.registry = new MetricRegistry(); 301 302 this.registry.register(getExecutorPoolName(), 303 new RatioGauge() { 304 @Override 305 protected Ratio getRatio() { 306 ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); 307 if (batchPool == null) { 308 return Ratio.of(0, 0); 309 } 310 return Ratio.of(batchPool.getActiveCount(), batchPool.getMaximumPoolSize()); 311 } 312 }); 313 this.registry.register(getMetaPoolName(), 314 new RatioGauge() { 315 @Override 316 protected Ratio getRatio() { 317 ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); 318 if (metaPool == null) { 319 return Ratio.of(0, 0); 320 } 321 return Ratio.of(metaPool.getActiveCount(), metaPool.getMaximumPoolSize()); 322 } 323 }); 324 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 325 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 326 this.metaCacheNumClearServer = registry.counter(name(this.getClass(), 327 "metaCacheNumClearServer", scope)); 328 this.metaCacheNumClearRegion = registry.counter(name(this.getClass(), 329 "metaCacheNumClearRegion", scope)); 330 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 331 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 332 this.getTracker = new CallTracker(this.registry, "Get", scope); 333 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 334 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 335 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 336 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 337 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 338 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 339 this.runnerStats = new RunnerStats(this.registry); 340 this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 341 "concurrentCallsPerServer", scope)); 342 343 this.reporter = JmxReporter.forRegistry(this.registry).build(); 344 this.reporter.start(); 345 } 346 347 @VisibleForTesting 348 final String getExecutorPoolName() { 349 return name(getClass(), "executorPoolActiveThreads", scope); 350 } 351 352 @VisibleForTesting 353 final String getMetaPoolName() { 354 return name(getClass(), "metaPoolActiveThreads", scope); 355 } 356 357 @VisibleForTesting 358 MetricRegistry getMetricRegistry() { 359 return registry; 360 } 361 362 public void shutdown() { 363 this.reporter.stop(); 364 } 365 366 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 367 public static CallStats newCallStats() { 368 // TODO: instance pool to reduce GC? 369 return new CallStats(); 370 } 371 372 /** Increment the number of meta cache hits. */ 373 public void incrMetaCacheHit() { 374 metaCacheHits.inc(); 375 } 376 377 /** Increment the number of meta cache misses. */ 378 public void incrMetaCacheMiss() { 379 metaCacheMisses.inc(); 380 } 381 382 /** Increment the number of meta cache drops requested for entire RegionServer. */ 383 public void incrMetaCacheNumClearServer() { 384 metaCacheNumClearServer.inc(); 385 } 386 387 /** Increment the number of meta cache drops requested for individual region. */ 388 public void incrMetaCacheNumClearRegion() { 389 metaCacheNumClearRegion.inc(); 390 } 391 392 /** Increment the number of hedged read that have occurred. */ 393 public void incrHedgedReadOps() { 394 hedgedReadOps.inc(); 395 } 396 397 /** Increment the number of hedged read returned faster than the original read. */ 398 public void incrHedgedReadWin() { 399 hedgedReadWin.inc(); 400 } 401 402 /** Increment the number of normal runner counts. */ 403 public void incrNormalRunners() { 404 this.runnerStats.incrNormalRunners(); 405 } 406 407 /** Increment the number of delay runner counts. */ 408 public void incrDelayRunners() { 409 this.runnerStats.incrDelayRunners(); 410 } 411 412 /** Update delay interval of delay runner. */ 413 public void updateDelayInterval(long interval) { 414 this.runnerStats.updateDelayInterval(interval); 415 } 416 417 /** 418 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 419 */ 420 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 421 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 422 } 423 424 /** Update call stats for non-critical-path methods */ 425 private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { 426 final String methodName = method.getService().getName() + "_" + method.getName(); 427 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) 428 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 429 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 430 .update(stats.getRequestSizeBytes()); 431 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 432 .update(stats.getResponseSizeBytes()); 433 } 434 435 /** Report RPC context to metrics system. */ 436 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 437 int callsPerServer = stats.getConcurrentCallsPerServer(); 438 if (callsPerServer > 0) { 439 concurrentCallsPerServerHist.update(callsPerServer); 440 } 441 // this implementation is tied directly to protobuf implementation details. would be better 442 // if we could dispatch based on something static, ie, request Message type. 443 if (method.getService() == ClientService.getDescriptor()) { 444 switch(method.getIndex()) { 445 case 0: 446 assert "Get".equals(method.getName()); 447 getTracker.updateRpc(stats); 448 return; 449 case 1: 450 assert "Mutate".equals(method.getName()); 451 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 452 switch(mutationType) { 453 case APPEND: 454 appendTracker.updateRpc(stats); 455 return; 456 case DELETE: 457 deleteTracker.updateRpc(stats); 458 return; 459 case INCREMENT: 460 incrementTracker.updateRpc(stats); 461 return; 462 case PUT: 463 putTracker.updateRpc(stats); 464 return; 465 default: 466 throw new RuntimeException("Unrecognized mutation type " + mutationType); 467 } 468 case 2: 469 assert "Scan".equals(method.getName()); 470 scanTracker.updateRpc(stats); 471 return; 472 case 3: 473 assert "BulkLoadHFile".equals(method.getName()); 474 // use generic implementation 475 break; 476 case 4: 477 assert "PrepareBulkLoad".equals(method.getName()); 478 // use generic implementation 479 break; 480 case 5: 481 assert "CleanupBulkLoad".equals(method.getName()); 482 // use generic implementation 483 break; 484 case 6: 485 assert "ExecService".equals(method.getName()); 486 // use generic implementation 487 break; 488 case 7: 489 assert "ExecRegionServerService".equals(method.getName()); 490 // use generic implementation 491 break; 492 case 8: 493 assert "Multi".equals(method.getName()); 494 multiTracker.updateRpc(stats); 495 return; 496 default: 497 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 498 } 499 } 500 // Fallback to dynamic registry lookup for DDL methods. 501 updateRpcGeneric(method, stats); 502 } 503 504 public void incrCacheDroppingExceptions(Object exception) { 505 getMetric(CACHE_BASE + 506 (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 507 cacheDroppingExceptions, counterFactory).inc(); 508 } 509}