001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 040import org.apache.hbase.thirdparty.com.google.protobuf.Message; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 044import org.apache.hadoop.hbase.util.Bytes; 045 046/** 047 * This class is for maintaining the various connection statistics and publishing them through 048 * the metrics interfaces. 049 * 050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not 051 * conflict with other uses of Yammer Metrics within the client application. Instantiating 052 * this class implicitly creates and "starts" instances of these classes; be sure to call 053 * {@link #shutdown()} to terminate the thread pools they allocate. 054 */ 055@InterfaceAudience.Private 056public class MetricsConnection implements StatisticTrackable { 057 058 /** Set this key to {@code true} to enable metrics collection of client requests. */ 059 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 060 061 private static final String DRTN_BASE = "rpcCallDurationMs_"; 062 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 063 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 064 private static final String MEMLOAD_BASE = "memstoreLoad_"; 065 private static final String HEAP_BASE = "heapOccupancy_"; 066 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 067 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 068 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 069 070 /** A container class for collecting details about the RPC call as it percolates. */ 071 public static class CallStats { 072 private long requestSizeBytes = 0; 073 private long responseSizeBytes = 0; 074 private long startTime = 0; 075 private long callTimeMs = 0; 076 private int concurrentCallsPerServer = 0; 077 private int numActionsPerServer = 0; 078 079 public long getRequestSizeBytes() { 080 return requestSizeBytes; 081 } 082 083 public void setRequestSizeBytes(long requestSizeBytes) { 084 this.requestSizeBytes = requestSizeBytes; 085 } 086 087 public long getResponseSizeBytes() { 088 return responseSizeBytes; 089 } 090 091 public void setResponseSizeBytes(long responseSizeBytes) { 092 this.responseSizeBytes = responseSizeBytes; 093 } 094 095 public long getStartTime() { 096 return startTime; 097 } 098 099 public void setStartTime(long startTime) { 100 this.startTime = startTime; 101 } 102 103 public long getCallTimeMs() { 104 return callTimeMs; 105 } 106 107 public void setCallTimeMs(long callTimeMs) { 108 this.callTimeMs = callTimeMs; 109 } 110 111 public int getConcurrentCallsPerServer() { 112 return concurrentCallsPerServer; 113 } 114 115 public void setConcurrentCallsPerServer(int callsPerServer) { 116 this.concurrentCallsPerServer = callsPerServer; 117 } 118 119 public int getNumActionsPerServer() { 120 return numActionsPerServer; 121 } 122 123 public void setNumActionsPerServer(int numActionsPerServer) { 124 this.numActionsPerServer = numActionsPerServer; 125 } 126 } 127 128 @VisibleForTesting 129 protected static final class CallTracker { 130 private final String name; 131 @VisibleForTesting final Timer callTimer; 132 @VisibleForTesting final Histogram reqHist; 133 @VisibleForTesting final Histogram respHist; 134 135 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 136 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 137 if (subName != null) { 138 sb.append("(").append(subName).append(")"); 139 } 140 this.name = sb.toString(); 141 this.callTimer = registry.timer(name(MetricsConnection.class, 142 DRTN_BASE + this.name, scope)); 143 this.reqHist = registry.histogram(name(MetricsConnection.class, 144 REQ_BASE + this.name, scope)); 145 this.respHist = registry.histogram(name(MetricsConnection.class, 146 RESP_BASE + this.name, scope)); 147 } 148 149 private CallTracker(MetricRegistry registry, String name, String scope) { 150 this(registry, name, null, scope); 151 } 152 153 public void updateRpc(CallStats stats) { 154 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 155 this.reqHist.update(stats.getRequestSizeBytes()); 156 this.respHist.update(stats.getResponseSizeBytes()); 157 } 158 159 @Override 160 public String toString() { 161 return "CallTracker:" + name; 162 } 163 } 164 165 protected static class RegionStats { 166 final String name; 167 final Histogram memstoreLoadHist; 168 final Histogram heapOccupancyHist; 169 170 public RegionStats(MetricRegistry registry, String name) { 171 this.name = name; 172 this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class, 173 MEMLOAD_BASE + this.name)); 174 this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class, 175 HEAP_BASE + this.name)); 176 } 177 178 public void update(RegionLoadStats regionStatistics) { 179 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 180 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 181 } 182 } 183 184 @VisibleForTesting 185 protected static class RunnerStats { 186 final Counter normalRunners; 187 final Counter delayRunners; 188 final Histogram delayIntevalHist; 189 190 public RunnerStats(MetricRegistry registry) { 191 this.normalRunners = registry.counter( 192 name(MetricsConnection.class, "normalRunnersCount")); 193 this.delayRunners = registry.counter( 194 name(MetricsConnection.class, "delayRunnersCount")); 195 this.delayIntevalHist = registry.histogram( 196 name(MetricsConnection.class, "delayIntervalHist")); 197 } 198 199 public void incrNormalRunners() { 200 this.normalRunners.inc(); 201 } 202 203 public void incrDelayRunners() { 204 this.delayRunners.inc(); 205 } 206 207 public void updateDelayInterval(long interval) { 208 this.delayIntevalHist.update(interval); 209 } 210 } 211 212 @VisibleForTesting 213 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats 214 = new ConcurrentHashMap<>(); 215 216 public void updateServerStats(ServerName serverName, byte[] regionName, 217 Object r) { 218 if (!(r instanceof Result)) { 219 return; 220 } 221 Result result = (Result) r; 222 RegionLoadStats stats = result.getStats(); 223 if (stats == null) { 224 return; 225 } 226 updateRegionStats(serverName, regionName, stats); 227 } 228 229 @Override 230 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 231 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 232 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 233 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 234 RegionStats regionStats = 235 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 236 regionStats.update(stats); 237 } 238 239 /** A lambda for dispatching to the appropriate metric factory method */ 240 private static interface NewMetric<T> { 241 T newMetric(Class<?> clazz, String name, String scope); 242 } 243 244 /** Anticipated number of metric entries */ 245 private static final int CAPACITY = 50; 246 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 247 private static final float LOAD_FACTOR = 0.75f; 248 /** 249 * Anticipated number of concurrent accessor threads, from 250 * {@link ConnectionImplementation#getBatchPool()} 251 */ 252 private static final int CONCURRENCY_LEVEL = 256; 253 254 private final MetricRegistry registry; 255 private final JmxReporter reporter; 256 private final String scope; 257 258 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 259 @Override public Timer newMetric(Class<?> clazz, String name, String scope) { 260 return registry.timer(name(clazz, name, scope)); 261 } 262 }; 263 264 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 265 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) { 266 return registry.histogram(name(clazz, name, scope)); 267 } 268 }; 269 270 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 271 @Override public Counter newMetric(Class<?> clazz, String name, String scope) { 272 return registry.counter(name(clazz, name, scope)); 273 } 274 }; 275 276 // static metrics 277 278 @VisibleForTesting protected final Counter metaCacheHits; 279 @VisibleForTesting protected final Counter metaCacheMisses; 280 @VisibleForTesting protected final CallTracker getTracker; 281 @VisibleForTesting protected final CallTracker scanTracker; 282 @VisibleForTesting protected final CallTracker appendTracker; 283 @VisibleForTesting protected final CallTracker deleteTracker; 284 @VisibleForTesting protected final CallTracker incrementTracker; 285 @VisibleForTesting protected final CallTracker putTracker; 286 @VisibleForTesting protected final CallTracker multiTracker; 287 @VisibleForTesting protected final RunnerStats runnerStats; 288 @VisibleForTesting protected final Counter metaCacheNumClearServer; 289 @VisibleForTesting protected final Counter metaCacheNumClearRegion; 290 @VisibleForTesting protected final Counter hedgedReadOps; 291 @VisibleForTesting protected final Counter hedgedReadWin; 292 @VisibleForTesting protected final Histogram concurrentCallsPerServerHist; 293 @VisibleForTesting protected final Histogram numActionsPerServerHist; 294 295 // dynamic metrics 296 297 // These maps are used to cache references to the metric instances that are managed by the 298 // registry. I don't think their use perfectly removes redundant allocations, but it's 299 // a big improvement over calling registry.newMetric each time. 300 @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers = 301 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 302 @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms = 303 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, 304 LOAD_FACTOR, CONCURRENCY_LEVEL); 305 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 306 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 307 308 MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 309 Supplier<ThreadPoolExecutor> metaPool) { 310 this.scope = scope; 311 this.registry = new MetricRegistry(); 312 this.registry.register(getExecutorPoolName(), 313 new RatioGauge() { 314 @Override 315 protected Ratio getRatio() { 316 ThreadPoolExecutor pool = batchPool.get(); 317 if (pool == null) { 318 return Ratio.of(0, 0); 319 } 320 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 321 } 322 }); 323 this.registry.register(getMetaPoolName(), 324 new RatioGauge() { 325 @Override 326 protected Ratio getRatio() { 327 ThreadPoolExecutor pool = metaPool.get(); 328 if (pool == null) { 329 return Ratio.of(0, 0); 330 } 331 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 332 } 333 }); 334 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 335 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 336 this.metaCacheNumClearServer = registry.counter(name(this.getClass(), 337 "metaCacheNumClearServer", scope)); 338 this.metaCacheNumClearRegion = registry.counter(name(this.getClass(), 339 "metaCacheNumClearRegion", scope)); 340 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 341 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 342 this.getTracker = new CallTracker(this.registry, "Get", scope); 343 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 344 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 345 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 346 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 347 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 348 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 349 this.runnerStats = new RunnerStats(this.registry); 350 this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 351 "concurrentCallsPerServer", scope)); 352 this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, 353 "numActionsPerServer", scope)); 354 355 this.reporter = JmxReporter.forRegistry(this.registry).build(); 356 this.reporter.start(); 357 } 358 359 @VisibleForTesting 360 final String getExecutorPoolName() { 361 return name(getClass(), "executorPoolActiveThreads", scope); 362 } 363 364 @VisibleForTesting 365 final String getMetaPoolName() { 366 return name(getClass(), "metaPoolActiveThreads", scope); 367 } 368 369 @VisibleForTesting 370 MetricRegistry getMetricRegistry() { 371 return registry; 372 } 373 374 public void shutdown() { 375 this.reporter.stop(); 376 } 377 378 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 379 public static CallStats newCallStats() { 380 // TODO: instance pool to reduce GC? 381 return new CallStats(); 382 } 383 384 /** Increment the number of meta cache hits. */ 385 public void incrMetaCacheHit() { 386 metaCacheHits.inc(); 387 } 388 389 /** Increment the number of meta cache misses. */ 390 public void incrMetaCacheMiss() { 391 metaCacheMisses.inc(); 392 } 393 394 /** Increment the number of meta cache drops requested for entire RegionServer. */ 395 public void incrMetaCacheNumClearServer() { 396 metaCacheNumClearServer.inc(); 397 } 398 399 /** Increment the number of meta cache drops requested for individual region. */ 400 public void incrMetaCacheNumClearRegion() { 401 metaCacheNumClearRegion.inc(); 402 } 403 404 /** Increment the number of meta cache drops requested for individual region. */ 405 public void incrMetaCacheNumClearRegion(int count) { 406 metaCacheNumClearRegion.inc(count); 407 } 408 409 /** Increment the number of hedged read that have occurred. */ 410 public void incrHedgedReadOps() { 411 hedgedReadOps.inc(); 412 } 413 414 /** Increment the number of hedged read returned faster than the original read. */ 415 public void incrHedgedReadWin() { 416 hedgedReadWin.inc(); 417 } 418 419 /** Increment the number of normal runner counts. */ 420 public void incrNormalRunners() { 421 this.runnerStats.incrNormalRunners(); 422 } 423 424 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 425 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 426 this.runnerStats.incrDelayRunners(); 427 this.runnerStats.updateDelayInterval(interval); 428 } 429 430 /** 431 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 432 */ 433 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 434 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 435 } 436 437 /** Update call stats for non-critical-path methods */ 438 private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { 439 final String methodName = method.getService().getName() + "_" + method.getName(); 440 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) 441 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 442 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 443 .update(stats.getRequestSizeBytes()); 444 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 445 .update(stats.getResponseSizeBytes()); 446 } 447 448 /** Report RPC context to metrics system. */ 449 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 450 int callsPerServer = stats.getConcurrentCallsPerServer(); 451 if (callsPerServer > 0) { 452 concurrentCallsPerServerHist.update(callsPerServer); 453 } 454 // this implementation is tied directly to protobuf implementation details. would be better 455 // if we could dispatch based on something static, ie, request Message type. 456 if (method.getService() == ClientService.getDescriptor()) { 457 switch(method.getIndex()) { 458 case 0: 459 assert "Get".equals(method.getName()); 460 getTracker.updateRpc(stats); 461 return; 462 case 1: 463 assert "Mutate".equals(method.getName()); 464 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 465 switch(mutationType) { 466 case APPEND: 467 appendTracker.updateRpc(stats); 468 return; 469 case DELETE: 470 deleteTracker.updateRpc(stats); 471 return; 472 case INCREMENT: 473 incrementTracker.updateRpc(stats); 474 return; 475 case PUT: 476 putTracker.updateRpc(stats); 477 return; 478 default: 479 throw new RuntimeException("Unrecognized mutation type " + mutationType); 480 } 481 case 2: 482 assert "Scan".equals(method.getName()); 483 scanTracker.updateRpc(stats); 484 return; 485 case 3: 486 assert "BulkLoadHFile".equals(method.getName()); 487 // use generic implementation 488 break; 489 case 4: 490 assert "PrepareBulkLoad".equals(method.getName()); 491 // use generic implementation 492 break; 493 case 5: 494 assert "CleanupBulkLoad".equals(method.getName()); 495 // use generic implementation 496 break; 497 case 6: 498 assert "ExecService".equals(method.getName()); 499 // use generic implementation 500 break; 501 case 7: 502 assert "ExecRegionServerService".equals(method.getName()); 503 // use generic implementation 504 break; 505 case 8: 506 assert "Multi".equals(method.getName()); 507 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 508 multiTracker.updateRpc(stats); 509 return; 510 default: 511 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 512 } 513 } 514 // Fallback to dynamic registry lookup for DDL methods. 515 updateRpcGeneric(method, stats); 516 } 517 518 public void incrCacheDroppingExceptions(Object exception) { 519 getMetric(CACHE_BASE + 520 (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 521 cacheDroppingExceptions, counterFactory).inc(); 522 } 523}