001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.function.Supplier; 036 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044import org.apache.yetus.audience.InterfaceAudience; 045 046/** 047 * This class is for maintaining the various connection statistics and publishing them through 048 * the metrics interfaces. 049 * 050 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not 051 * conflict with other uses of Yammer Metrics within the client application. Instantiating 052 * this class implicitly creates and "starts" instances of these classes; be sure to call 053 * {@link #shutdown()} to terminate the thread pools they allocate. 054 */ 055@InterfaceAudience.Private 056public class MetricsConnection implements StatisticTrackable { 057 058 /** Set this key to {@code true} to enable metrics collection of client requests. */ 059 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 060 061 private static final String CNT_BASE = "rpcCount_"; 062 private static final String DRTN_BASE = "rpcCallDurationMs_"; 063 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 064 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 065 private static final String MEMLOAD_BASE = "memstoreLoad_"; 066 private static final String HEAP_BASE = "heapOccupancy_"; 067 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 068 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 069 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 070 071 /** A container class for collecting details about the RPC call as it percolates. */ 072 public static class CallStats { 073 private long requestSizeBytes = 0; 074 private long responseSizeBytes = 0; 075 private long startTime = 0; 076 private long callTimeMs = 0; 077 private int concurrentCallsPerServer = 0; 078 private int numActionsPerServer = 0; 079 080 public long getRequestSizeBytes() { 081 return requestSizeBytes; 082 } 083 084 public void setRequestSizeBytes(long requestSizeBytes) { 085 this.requestSizeBytes = requestSizeBytes; 086 } 087 088 public long getResponseSizeBytes() { 089 return responseSizeBytes; 090 } 091 092 public void setResponseSizeBytes(long responseSizeBytes) { 093 this.responseSizeBytes = responseSizeBytes; 094 } 095 096 public long getStartTime() { 097 return startTime; 098 } 099 100 public void setStartTime(long startTime) { 101 this.startTime = startTime; 102 } 103 104 public long getCallTimeMs() { 105 return callTimeMs; 106 } 107 108 public void setCallTimeMs(long callTimeMs) { 109 this.callTimeMs = callTimeMs; 110 } 111 112 public int getConcurrentCallsPerServer() { 113 return concurrentCallsPerServer; 114 } 115 116 public void setConcurrentCallsPerServer(int callsPerServer) { 117 this.concurrentCallsPerServer = callsPerServer; 118 } 119 120 public int getNumActionsPerServer() { 121 return numActionsPerServer; 122 } 123 124 public void setNumActionsPerServer(int numActionsPerServer) { 125 this.numActionsPerServer = numActionsPerServer; 126 } 127 } 128 129 protected static final class CallTracker { 130 private final String name; 131 final Timer callTimer; 132 final Histogram reqHist; 133 final Histogram respHist; 134 135 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 136 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 137 if (subName != null) { 138 sb.append("(").append(subName).append(")"); 139 } 140 this.name = sb.toString(); 141 this.callTimer = registry.timer(name(MetricsConnection.class, 142 DRTN_BASE + this.name, scope)); 143 this.reqHist = registry.histogram(name(MetricsConnection.class, 144 REQ_BASE + this.name, scope)); 145 this.respHist = registry.histogram(name(MetricsConnection.class, 146 RESP_BASE + this.name, scope)); 147 } 148 149 private CallTracker(MetricRegistry registry, String name, String scope) { 150 this(registry, name, null, scope); 151 } 152 153 public void updateRpc(CallStats stats) { 154 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 155 this.reqHist.update(stats.getRequestSizeBytes()); 156 this.respHist.update(stats.getResponseSizeBytes()); 157 } 158 159 @Override 160 public String toString() { 161 return "CallTracker:" + name; 162 } 163 } 164 165 protected static class RegionStats { 166 final String name; 167 final Histogram memstoreLoadHist; 168 final Histogram heapOccupancyHist; 169 170 public RegionStats(MetricRegistry registry, String name) { 171 this.name = name; 172 this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class, 173 MEMLOAD_BASE + this.name)); 174 this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class, 175 HEAP_BASE + this.name)); 176 } 177 178 public void update(RegionLoadStats regionStatistics) { 179 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 180 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 181 } 182 } 183 184 protected static class RunnerStats { 185 final Counter normalRunners; 186 final Counter delayRunners; 187 final Histogram delayIntevalHist; 188 189 public RunnerStats(MetricRegistry registry) { 190 this.normalRunners = registry.counter( 191 name(MetricsConnection.class, "normalRunnersCount")); 192 this.delayRunners = registry.counter( 193 name(MetricsConnection.class, "delayRunnersCount")); 194 this.delayIntevalHist = registry.histogram( 195 name(MetricsConnection.class, "delayIntervalHist")); 196 } 197 198 public void incrNormalRunners() { 199 this.normalRunners.inc(); 200 } 201 202 public void incrDelayRunners() { 203 this.delayRunners.inc(); 204 } 205 206 public void updateDelayInterval(long interval) { 207 this.delayIntevalHist.update(interval); 208 } 209 } 210 211 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats 212 = new ConcurrentHashMap<>(); 213 214 public void updateServerStats(ServerName serverName, byte[] regionName, 215 Object r) { 216 if (!(r instanceof Result)) { 217 return; 218 } 219 Result result = (Result) r; 220 RegionLoadStats stats = result.getStats(); 221 if (stats == null) { 222 return; 223 } 224 updateRegionStats(serverName, regionName, stats); 225 } 226 227 @Override 228 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 229 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 230 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 231 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 232 RegionStats regionStats = 233 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 234 regionStats.update(stats); 235 } 236 237 /** A lambda for dispatching to the appropriate metric factory method */ 238 private static interface NewMetric<T> { 239 T newMetric(Class<?> clazz, String name, String scope); 240 } 241 242 /** Anticipated number of metric entries */ 243 private static final int CAPACITY = 50; 244 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 245 private static final float LOAD_FACTOR = 0.75f; 246 /** 247 * Anticipated number of concurrent accessor threads 248 */ 249 private static final int CONCURRENCY_LEVEL = 256; 250 251 private final MetricRegistry registry; 252 private final JmxReporter reporter; 253 private final String scope; 254 255 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 256 @Override public Timer newMetric(Class<?> clazz, String name, String scope) { 257 return registry.timer(name(clazz, name, scope)); 258 } 259 }; 260 261 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 262 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) { 263 return registry.histogram(name(clazz, name, scope)); 264 } 265 }; 266 267 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 268 @Override public Counter newMetric(Class<?> clazz, String name, String scope) { 269 return registry.counter(name(clazz, name, scope)); 270 } 271 }; 272 273 // static metrics 274 275 protected final Counter metaCacheHits; 276 protected final Counter metaCacheMisses; 277 protected final CallTracker getTracker; 278 protected final CallTracker scanTracker; 279 protected final CallTracker appendTracker; 280 protected final CallTracker deleteTracker; 281 protected final CallTracker incrementTracker; 282 protected final CallTracker putTracker; 283 protected final CallTracker multiTracker; 284 protected final RunnerStats runnerStats; 285 protected final Counter metaCacheNumClearServer; 286 protected final Counter metaCacheNumClearRegion; 287 protected final Counter hedgedReadOps; 288 protected final Counter hedgedReadWin; 289 protected final Histogram concurrentCallsPerServerHist; 290 protected final Histogram numActionsPerServerHist; 291 292 // dynamic metrics 293 294 // These maps are used to cache references to the metric instances that are managed by the 295 // registry. I don't think their use perfectly removes redundant allocations, but it's 296 // a big improvement over calling registry.newMetric each time. 297 protected final ConcurrentMap<String, Timer> rpcTimers = 298 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 299 protected final ConcurrentMap<String, Histogram> rpcHistograms = 300 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, 301 LOAD_FACTOR, CONCURRENCY_LEVEL); 302 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 303 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 304 protected final ConcurrentMap<String, Counter> rpcCounters = 305 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 306 307 MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool, 308 Supplier<ThreadPoolExecutor> metaPool) { 309 this.scope = scope; 310 this.registry = new MetricRegistry(); 311 this.registry.register(getExecutorPoolName(), 312 new RatioGauge() { 313 @Override 314 protected Ratio getRatio() { 315 ThreadPoolExecutor pool = batchPool.get(); 316 if (pool == null) { 317 return Ratio.of(0, 0); 318 } 319 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 320 } 321 }); 322 this.registry.register(getMetaPoolName(), 323 new RatioGauge() { 324 @Override 325 protected Ratio getRatio() { 326 ThreadPoolExecutor pool = metaPool.get(); 327 if (pool == null) { 328 return Ratio.of(0, 0); 329 } 330 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize()); 331 } 332 }); 333 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 334 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 335 this.metaCacheNumClearServer = registry.counter(name(this.getClass(), 336 "metaCacheNumClearServer", scope)); 337 this.metaCacheNumClearRegion = registry.counter(name(this.getClass(), 338 "metaCacheNumClearRegion", scope)); 339 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 340 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 341 this.getTracker = new CallTracker(this.registry, "Get", scope); 342 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 343 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 344 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 345 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 346 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 347 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 348 this.runnerStats = new RunnerStats(this.registry); 349 this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 350 "concurrentCallsPerServer", scope)); 351 this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, 352 "numActionsPerServer", scope)); 353 354 this.reporter = JmxReporter.forRegistry(this.registry).build(); 355 this.reporter.start(); 356 } 357 358 final String getExecutorPoolName() { 359 return name(getClass(), "executorPoolActiveThreads", scope); 360 } 361 362 final String getMetaPoolName() { 363 return name(getClass(), "metaPoolActiveThreads", scope); 364 } 365 366 MetricRegistry getMetricRegistry() { 367 return registry; 368 } 369 370 public void shutdown() { 371 this.reporter.stop(); 372 } 373 374 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 375 public static CallStats newCallStats() { 376 // TODO: instance pool to reduce GC? 377 return new CallStats(); 378 } 379 380 /** Increment the number of meta cache hits. */ 381 public void incrMetaCacheHit() { 382 metaCacheHits.inc(); 383 } 384 385 /** Increment the number of meta cache misses. */ 386 public void incrMetaCacheMiss() { 387 metaCacheMisses.inc(); 388 } 389 390 /** Increment the number of meta cache drops requested for entire RegionServer. */ 391 public void incrMetaCacheNumClearServer() { 392 metaCacheNumClearServer.inc(); 393 } 394 395 /** Increment the number of meta cache drops requested for individual region. */ 396 public void incrMetaCacheNumClearRegion() { 397 metaCacheNumClearRegion.inc(); 398 } 399 400 /** Increment the number of meta cache drops requested for individual region. */ 401 public void incrMetaCacheNumClearRegion(int count) { 402 metaCacheNumClearRegion.inc(count); 403 } 404 405 /** Increment the number of hedged read that have occurred. */ 406 public void incrHedgedReadOps() { 407 hedgedReadOps.inc(); 408 } 409 410 /** Increment the number of hedged read returned faster than the original read. */ 411 public void incrHedgedReadWin() { 412 hedgedReadWin.inc(); 413 } 414 415 /** Increment the number of normal runner counts. */ 416 public void incrNormalRunners() { 417 this.runnerStats.incrNormalRunners(); 418 } 419 420 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 421 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 422 this.runnerStats.incrDelayRunners(); 423 this.runnerStats.updateDelayInterval(interval); 424 } 425 426 /** 427 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 428 */ 429 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 430 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 431 } 432 433 /** Update call stats for non-critical-path methods */ 434 private void updateRpcGeneric(String methodName, CallStats stats) { 435 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) 436 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 437 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 438 .update(stats.getRequestSizeBytes()); 439 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 440 .update(stats.getResponseSizeBytes()); 441 } 442 443 /** Report RPC context to metrics system. */ 444 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { 445 int callsPerServer = stats.getConcurrentCallsPerServer(); 446 if (callsPerServer > 0) { 447 concurrentCallsPerServerHist.update(callsPerServer); 448 } 449 // Update the counter that tracks RPCs by type. 450 final String methodName = method.getService().getName() + "_" + method.getName(); 451 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 452 // this implementation is tied directly to protobuf implementation details. would be better 453 // if we could dispatch based on something static, ie, request Message type. 454 if (method.getService() == ClientService.getDescriptor()) { 455 switch(method.getIndex()) { 456 case 0: 457 assert "Get".equals(method.getName()); 458 getTracker.updateRpc(stats); 459 return; 460 case 1: 461 assert "Mutate".equals(method.getName()); 462 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 463 switch(mutationType) { 464 case APPEND: 465 appendTracker.updateRpc(stats); 466 return; 467 case DELETE: 468 deleteTracker.updateRpc(stats); 469 return; 470 case INCREMENT: 471 incrementTracker.updateRpc(stats); 472 return; 473 case PUT: 474 putTracker.updateRpc(stats); 475 return; 476 default: 477 throw new RuntimeException("Unrecognized mutation type " + mutationType); 478 } 479 case 2: 480 assert "Scan".equals(method.getName()); 481 scanTracker.updateRpc(stats); 482 return; 483 case 3: 484 assert "BulkLoadHFile".equals(method.getName()); 485 // use generic implementation 486 break; 487 case 4: 488 assert "PrepareBulkLoad".equals(method.getName()); 489 // use generic implementation 490 break; 491 case 5: 492 assert "CleanupBulkLoad".equals(method.getName()); 493 // use generic implementation 494 break; 495 case 6: 496 assert "ExecService".equals(method.getName()); 497 // use generic implementation 498 break; 499 case 7: 500 assert "ExecRegionServerService".equals(method.getName()); 501 // use generic implementation 502 break; 503 case 8: 504 assert "Multi".equals(method.getName()); 505 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 506 multiTracker.updateRpc(stats); 507 return; 508 default: 509 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 510 } 511 } 512 // Fallback to dynamic registry lookup for DDL methods. 513 updateRpcGeneric(methodName, stats); 514 } 515 516 public void incrCacheDroppingExceptions(Object exception) { 517 getMetric(CACHE_BASE + 518 (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 519 cacheDroppingExceptions, counterFactory).inc(); 520 } 521}