001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 040import org.apache.hbase.thirdparty.com.google.protobuf.Message; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 044import org.apache.hadoop.hbase.util.Bytes; 045 046/** 047 * This class is for maintaining the various connection statistics and publishing them through 048 * the metrics interfaces. 049 * 050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not 051 * conflict with other uses of Yammer Metrics within the client application. Instantiating 052 * this class implicitly creates and "starts" instances of these classes; be sure to call 053 * {@link #shutdown()} to terminate the thread pools they allocate. 054 */ 055@InterfaceAudience.Private 056public class MetricsConnection implements StatisticTrackable { 057 058 /** Set this key to {@code true} to enable metrics collection of client requests. */ 059 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 060 061 private static final String DRTN_BASE = "rpcCallDurationMs_"; 062 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 063 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 064 private static final String MEMLOAD_BASE = "memstoreLoad_"; 065 private static final String HEAP_BASE = "heapOccupancy_"; 066 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 067 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 068 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 069 070 /** A container class for collecting details about the RPC call as it percolates. */ 071 public static class CallStats { 072 private long requestSizeBytes = 0; 073 private long responseSizeBytes = 0; 074 private long startTime = 0; 075 private long callTimeMs = 0; 076 private int concurrentCallsPerServer = 0; 077 private int numActionsPerServer = 0; 078 079 public long getRequestSizeBytes() { 080 return requestSizeBytes; 081 } 082 083 public void setRequestSizeBytes(long requestSizeBytes) { 084 this.requestSizeBytes = requestSizeBytes; 085 } 086 087 public long getResponseSizeBytes() { 088 return responseSizeBytes; 089 } 090 091 public void setResponseSizeBytes(long responseSizeBytes) { 092 this.responseSizeBytes = responseSizeBytes; 093 } 094 095 public long getStartTime() { 096 return startTime; 097 } 098 099 public void setStartTime(long startTime) { 100 this.startTime = startTime; 101 } 102 103 public long getCallTimeMs() { 104 return callTimeMs; 105 } 106 107 public void setCallTimeMs(long callTimeMs) { 108 this.callTimeMs = callTimeMs; 109 } 110 111 public int getConcurrentCallsPerServer() { 112 return concurrentCallsPerServer; 113 } 114 115 public void setConcurrentCallsPerServer(int callsPerServer) { 116 this.concurrentCallsPerServer = callsPerServer; 117 } 118 119 public int getNumActionsPerServer() { 120 return numActionsPerServer; 121 } 122 123 public void setNumActionsPerServer(int numActionsPerServer) { 124 this.numActionsPerServer = numActionsPerServer; 125 } 126 } 127 128 @VisibleForTesting 129 protected static final class CallTracker { 130 private final String name; 131 @VisibleForTesting final Timer callTimer; 132 @VisibleForTesting final Histogram reqHist; 133 @VisibleForTesting final Histogram respHist; 134 135 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 136 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 137 if (subName != null) { 138 sb.append("(").append(subName).append(")"); 139 } 140 this.name = sb.toString(); 141 this.callTimer = registry.timer(name(MetricsConnection.class, 142 DRTN_BASE + this.name, scope)); 143 this.reqHist = registry.histogram(name(MetricsConnection.class, 144 REQ_BASE + this.name, scope)); 145 this.respHist = registry.histogram(name(MetricsConnection.class, 146 RESP_BASE + this.name, scope)); 147 } 148 149 private CallTracker(MetricRegistry registry, String name, String scope) { 150 this(registry, name, null, scope); 151 } 152 153 public void updateRpc(CallStats stats) { 154 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 155 this.reqHist.update(stats.getRequestSizeBytes()); 156 this.respHist.update(stats.getResponseSizeBytes()); 157 } 158 159 @Override 160 public String toString() { 161 return "CallTracker:" + name; 162 } 163 } 164 165 protected static class RegionStats { 166 final String name; 167 final Histogram memstoreLoadHist; 168 final Histogram heapOccupancyHist; 169 170 public RegionStats(MetricRegistry registry, String name) { 171 this.name = name; 172 this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class, 173 MEMLOAD_BASE + this.name)); 174 this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class, 175 HEAP_BASE + this.name)); 176 } 177 178 public void update(RegionLoadStats regionStatistics) { 179 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 180 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 181 } 182 } 183 184 @VisibleForTesting 185 protected static class RunnerStats { 186 final Counter normalRunners; 187 final Counter delayRunners; 188 final Histogram delayIntevalHist; 189 190 public RunnerStats(MetricRegistry registry) { 191 this.normalRunners = registry.counter( 192 name(MetricsConnection.class, "normalRunnersCount")); 193 this.delayRunners = registry.counter( 194 name(MetricsConnection.class, "delayRunnersCount")); 195 this.delayIntevalHist = registry.histogram( 196 name(MetricsConnection.class, "delayIntervalHist")); 197 } 198 199 public void incrNormalRunners() { 200 this.normalRunners.inc(); 201 } 202 203 public void incrDelayRunners() { 204 this.delayRunners.inc(); 205 } 206 207 public void updateDelayInterval(long interval) { 208 this.delayIntevalHist.update(interval); 209 } 210 } 211 212 @VisibleForTesting 213 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats 214 = new ConcurrentHashMap<>(); 215 216 public void updateServerStats(ServerName serverName, byte[] regionName, 217 Object r) { 218 if (!(r instanceof Result)) { 219 return; 220 } 221 Result result = (Result) r; 222 RegionLoadStats stats = result.getStats(); 223 if (stats == null) { 224 return; 225 } 226 updateRegionStats(serverName, regionName, stats); 227 } 228 229 @Override 230 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 231 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 232 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 233 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 234 RegionStats regionStats = 235 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 236 regionStats.update(stats); 237 } 238 239 /** A lambda for dispatching to the appropriate metric factory method */ 240 private static interface NewMetric<T> { 241 T newMetric(Class<?> clazz, String name, String scope); 242 } 243 244 /** Anticipated number of metric entries */ 245 private static final int CAPACITY = 50; 246 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 247 private static final float LOAD_FACTOR = 0.75f; 248 /** 249 * Anticipated number of concurrent accessor threads 250 */ 251 private static final int CONCURRENCY_LEVEL = 256; 252 253 private final MetricRegistry registry; 254 private final JmxReporter reporter; 255 private final String scope; 256 257 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 258 @Override public Timer newMetric(Class<?> clazz, String name, String scope) { 259 return registry.timer(name(clazz, name, scope)); 260 } 261 }; 262 263 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 264 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) { 265 return registry.histogram(name(clazz, name, scope)); 266 } 267 }; 268 269 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 270 @Override public Counter newMetric(Class<?> clazz, String name, String scope) { 271 return registry.counter(name(clazz, name, scope)); 272 } 273 }; 274 275 // static metrics 276 277 @VisibleForTesting protected final Counter metaCacheHits; 278 @VisibleForTesting protected final Counter metaCacheMisses; 279 @VisibleForTesting protected final CallTracker getTracker; 280 @VisibleForTesting protected final CallTracker scanTracker; 281 @VisibleForTesting protected final CallTracker appendTracker; 282 @VisibleForTesting protected final CallTracker deleteTracker; 283 @VisibleForTesting protected final CallTracker incrementTracker; 284 @VisibleForTesting protected final CallTracker putTracker; 285 @VisibleForTesting protected final CallTracker multiTracker; 286 @VisibleForTesting protected final RunnerStats runnerStats; 287 @VisibleForTesting protected final Counter metaCacheNumClearServer; 288 @VisibleForTesting protected final Counter metaCacheNumClearRegion; 289 @VisibleForTesting protected final Counter hedgedReadOps; 290 @VisibleForTesting protected final Counter hedgedReadWin; 291 @VisibleForTesting protected final Histogram concurrentCallsPerServerHist; 292 @VisibleForTesting protected final Histogram numActionsPerServerHist; 293 294 // dynamic metrics 295 296 // These maps are used to cache references to the metric instances that are managed by the 297 // registry. I don't think their use perfectly removes redundant allocations, but it's 298 // a big improvement over calling registry.newMetric each time. 299 @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers = 300 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 301 @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms = 302 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, 303 LOAD_FACTOR, CONCURRENCY_LEVEL); 304 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 305 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 306 307 MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 308 Supplier<ThreadPoolExecutor> metaPool) { 309 this.scope = scope; 310 this.registry = new MetricRegistry(); 311 this.registry.register(getExecutorPoolName(), 312 new RatioGauge() { 313 @Override 314 protected Ratio getRatio() { 315 ThreadPoolExecutor pool = batchPool.get(); 316 if (pool == null) { 317 return Ratio.of(0, 0); 318 } 319 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 320 } 321 }); 322 this.registry.register(getMetaPoolName(), 323 new RatioGauge() { 324 @Override 325 protected Ratio getRatio() { 326 ThreadPoolExecutor pool = metaPool.get(); 327 if (pool == null) { 328 return Ratio.of(0, 0); 329 } 330 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 331 } 332 }); 333 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 334 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 335 this.metaCacheNumClearServer = registry.counter(name(this.getClass(), 336 "metaCacheNumClearServer", scope)); 337 this.metaCacheNumClearRegion = registry.counter(name(this.getClass(), 338 "metaCacheNumClearRegion", scope)); 339 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 340 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 341 this.getTracker = new CallTracker(this.registry, "Get", scope); 342 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 343 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 344 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 345 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 346 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 347 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 348 this.runnerStats = new RunnerStats(this.registry); 349 this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 350 "concurrentCallsPerServer", scope)); 351 this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, 352 "numActionsPerServer", scope)); 353 354 this.reporter = JmxReporter.forRegistry(this.registry).build(); 355 this.reporter.start(); 356 } 357 358 @VisibleForTesting 359 final String getExecutorPoolName() { 360 return name(getClass(), "executorPoolActiveThreads", scope); 361 } 362 363 @VisibleForTesting 364 final String getMetaPoolName() { 365 return name(getClass(), "metaPoolActiveThreads", scope); 366 } 367 368 @VisibleForTesting 369 MetricRegistry getMetricRegistry() { 370 return registry; 371 } 372 373 public void shutdown() { 374 this.reporter.stop(); 375 } 376 377 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 378 public static CallStats newCallStats() { 379 // TODO: instance pool to reduce GC? 380 return new CallStats(); 381 } 382 383 /** Increment the number of meta cache hits. */ 384 public void incrMetaCacheHit() { 385 metaCacheHits.inc(); 386 } 387 388 /** Increment the number of meta cache misses. */ 389 public void incrMetaCacheMiss() { 390 metaCacheMisses.inc(); 391 } 392 393 /** Increment the number of meta cache drops requested for entire RegionServer. */ 394 public void incrMetaCacheNumClearServer() { 395 metaCacheNumClearServer.inc(); 396 } 397 398 /** Increment the number of meta cache drops requested for individual region. */ 399 public void incrMetaCacheNumClearRegion() { 400 metaCacheNumClearRegion.inc(); 401 } 402 403 /** Increment the number of meta cache drops requested for individual region. */ 404 public void incrMetaCacheNumClearRegion(int count) { 405 metaCacheNumClearRegion.inc(count); 406 } 407 408 /** Increment the number of hedged read that have occurred. */ 409 public void incrHedgedReadOps() { 410 hedgedReadOps.inc(); 411 } 412 413 /** Increment the number of hedged read returned faster than the original read. */ 414 public void incrHedgedReadWin() { 415 hedgedReadWin.inc(); 416 } 417 418 /** Increment the number of normal runner counts. */ 419 public void incrNormalRunners() { 420 this.runnerStats.incrNormalRunners(); 421 } 422 423 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 424 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 425 this.runnerStats.incrDelayRunners(); 426 this.runnerStats.updateDelayInterval(interval); 427 } 428 429 /** 430 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 431 */ 432 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 433 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 434 } 435 436 /** Update call stats for non-critical-path methods */ 437 private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { 438 final String methodName = method.getService().getName() + "_" + method.getName(); 439 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) 440 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 441 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 442 .update(stats.getRequestSizeBytes()); 443 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 444 .update(stats.getResponseSizeBytes()); 445 } 446 447 /** Report RPC context to metrics system. */ 448 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 449 int callsPerServer = stats.getConcurrentCallsPerServer(); 450 if (callsPerServer > 0) { 451 concurrentCallsPerServerHist.update(callsPerServer); 452 } 453 // this implementation is tied directly to protobuf implementation details. would be better 454 // if we could dispatch based on something static, ie, request Message type. 455 if (method.getService() == ClientService.getDescriptor()) { 456 switch(method.getIndex()) { 457 case 0: 458 assert "Get".equals(method.getName()); 459 getTracker.updateRpc(stats); 460 return; 461 case 1: 462 assert "Mutate".equals(method.getName()); 463 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 464 switch(mutationType) { 465 case APPEND: 466 appendTracker.updateRpc(stats); 467 return; 468 case DELETE: 469 deleteTracker.updateRpc(stats); 470 return; 471 case INCREMENT: 472 incrementTracker.updateRpc(stats); 473 return; 474 case PUT: 475 putTracker.updateRpc(stats); 476 return; 477 default: 478 throw new RuntimeException("Unrecognized mutation type " + mutationType); 479 } 480 case 2: 481 assert "Scan".equals(method.getName()); 482 scanTracker.updateRpc(stats); 483 return; 484 case 3: 485 assert "BulkLoadHFile".equals(method.getName()); 486 // use generic implementation 487 break; 488 case 4: 489 assert "PrepareBulkLoad".equals(method.getName()); 490 // use generic implementation 491 break; 492 case 5: 493 assert "CleanupBulkLoad".equals(method.getName()); 494 // use generic implementation 495 break; 496 case 6: 497 assert "ExecService".equals(method.getName()); 498 // use generic implementation 499 break; 500 case 7: 501 assert "ExecRegionServerService".equals(method.getName()); 502 // use generic implementation 503 break; 504 case 8: 505 assert "Multi".equals(method.getName()); 506 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 507 multiTracker.updateRpc(stats); 508 return; 509 default: 510 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 511 } 512 } 513 // Fallback to dynamic registry lookup for DDL methods. 514 updateRpcGeneric(method, stats); 515 } 516 517 public void incrCacheDroppingExceptions(Object exception) { 518 getMetric(CACHE_BASE + 519 (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 520 cacheDroppingExceptions, counterFactory).inc(); 521 } 522}