001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import org.apache.hadoop.metrics2.lib.MutableFastCounter; 021import org.apache.hadoop.metrics2.lib.MutableGaugeInt; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSourceSource { 028 029 private final MetricsReplicationSourceImpl rms; 030 private final String id; 031 private final String sizeOfLogQueueKey; 032 private final String ageOfLastShippedOpKey; 033 private final String logReadInEditsKey; 034 private final String logEditsFilteredKey; 035 private final String shippedBatchesKey; 036 private final String shippedOpsKey; 037 private final String failedBatchesKey; 038 private String keyPrefix; 039 040 /** 041 * @deprecated since 1.3.0. Use {@link #shippedBytesKey} instead. 042 */ 043 @Deprecated 044 private final String shippedKBsKey; 045 private final String shippedBytesKey; 046 private final String logReadInBytesKey; 047 private final String shippedHFilesKey; 048 private final String sizeOfHFileRefsQueueKey; 049 private final String oldestWalAgeKey; 050 private final String sourceInitializingKey; 051 052 private final MutableHistogram ageOfLastShippedOpHist; 053 private final MutableGaugeLong sizeOfLogQueueGauge; 054 private final MutableFastCounter logReadInEditsCounter; 055 private final MutableFastCounter walEditsFilteredCounter; 056 private final MutableFastCounter shippedBatchesCounter; 057 private final MutableFastCounter failedBatchesCounter; 058 private final MutableFastCounter shippedOpsCounter; 059 private final MutableFastCounter shippedKBsCounter; 060 private final MutableFastCounter shippedBytesCounter; 061 private final MutableFastCounter logReadInBytesCounter; 062 private final MutableFastCounter shippedHFilesCounter; 063 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 064 065 private final String unknownFileLengthKey; 066 private final String uncleanlyClosedKey; 067 private final String uncleanlySkippedBytesKey; 068 private final String restartedKey; 069 private final String repeatedBytesKey; 070 private final String completedLogsKey; 071 private final String completedRecoveryKey; 072 private final MutableFastCounter unknownFileLengthForClosedWAL; 073 private final MutableFastCounter uncleanlyClosedWAL; 074 private final MutableFastCounter uncleanlyClosedSkippedBytes; 075 private final MutableFastCounter restartWALReading; 076 private final MutableFastCounter repeatedFileBytes; 077 private final MutableFastCounter completedWAL; 078 private final MutableFastCounter completedRecoveryQueue; 079 private final MutableGaugeLong oldestWalAge; 080 private final MutableGaugeInt sourceInitializing; 081 082 public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { 083 this.rms = rms; 084 this.id = id; 085 this.keyPrefix = "source." + this.id + "."; 086 087 ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; 088 ageOfLastShippedOpHist = rms.getMetricsRegistry().newTimeHistogram(ageOfLastShippedOpKey); 089 090 sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue"; 091 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L); 092 093 shippedBatchesKey = this.keyPrefix + "shippedBatches"; 094 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L); 095 096 failedBatchesKey = this.keyPrefix + "failedBatches"; 097 failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L); 098 099 shippedOpsKey = this.keyPrefix + "shippedOps"; 100 shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L); 101 102 shippedKBsKey = this.keyPrefix + "shippedKBs"; 103 shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L); 104 105 shippedBytesKey = this.keyPrefix + "shippedBytes"; 106 shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); 107 108 logReadInBytesKey = this.keyPrefix + "logReadInBytes"; 109 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L); 110 111 logReadInEditsKey = this.keyPrefix + "logEditsRead"; 112 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L); 113 114 logEditsFilteredKey = this.keyPrefix + "logEditsFiltered"; 115 walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L); 116 117 shippedHFilesKey = this.keyPrefix + "shippedHFiles"; 118 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L); 119 120 sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue"; 121 sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L); 122 123 unknownFileLengthKey = this.keyPrefix + "closedLogsWithUnknownFileLength"; 124 unknownFileLengthForClosedWAL = rms.getMetricsRegistry().getCounter(unknownFileLengthKey, 0L); 125 126 uncleanlyClosedKey = this.keyPrefix + "uncleanlyClosedLogs"; 127 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(uncleanlyClosedKey, 0L); 128 129 uncleanlySkippedBytesKey = this.keyPrefix + "ignoredUncleanlyClosedLogContentsInBytes"; 130 uncleanlyClosedSkippedBytes = rms.getMetricsRegistry().getCounter(uncleanlySkippedBytesKey, 0L); 131 132 restartedKey = this.keyPrefix + "restartedLogReading"; 133 restartWALReading = rms.getMetricsRegistry().getCounter(restartedKey, 0L); 134 135 repeatedBytesKey = this.keyPrefix + "repeatedLogFileBytes"; 136 repeatedFileBytes = rms.getMetricsRegistry().getCounter(repeatedBytesKey, 0L); 137 138 completedLogsKey = this.keyPrefix + "completedLogs"; 139 completedWAL = rms.getMetricsRegistry().getCounter(completedLogsKey, 0L); 140 141 completedRecoveryKey = this.keyPrefix + "completedRecoverQueues"; 142 completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L); 143 144 oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; 145 oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); 146 147 sourceInitializingKey = this.keyPrefix + "isInitializing"; 148 sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0); 149 } 150 151 @Override 152 public void setLastShippedAge(long age) { 153 ageOfLastShippedOpHist.add(age); 154 } 155 156 @Override 157 public void incrSizeOfLogQueue(int size) { 158 sizeOfLogQueueGauge.incr(size); 159 } 160 161 @Override 162 public void decrSizeOfLogQueue(int size) { 163 sizeOfLogQueueGauge.decr(size); 164 } 165 166 @Override 167 public void incrLogReadInEdits(long size) { 168 logReadInEditsCounter.incr(size); 169 } 170 171 @Override 172 public void incrLogEditsFiltered(long size) { 173 walEditsFilteredCounter.incr(size); 174 } 175 176 @Override 177 public void incrBatchesShipped(int batches) { 178 shippedBatchesCounter.incr(batches); 179 } 180 181 @Override 182 public void incrFailedBatches() { 183 failedBatchesCounter.incr(); 184 } 185 186 @Override 187 public void incrOpsShipped(long ops) { 188 shippedOpsCounter.incr(ops); 189 } 190 191 @Override 192 public void incrShippedBytes(long size) { 193 shippedBytesCounter.incr(size); 194 MetricsReplicationGlobalSourceSourceImpl.incrementKBsCounter(shippedBytesCounter, 195 shippedKBsCounter); 196 } 197 198 @Override 199 public void incrLogReadInBytes(long size) { 200 logReadInBytesCounter.incr(size); 201 } 202 203 @Override 204 public void clear() { 205 rms.removeMetric(ageOfLastShippedOpKey); 206 207 rms.removeMetric(sizeOfLogQueueKey); 208 209 rms.removeMetric(shippedBatchesKey); 210 rms.removeMetric(failedBatchesKey); 211 rms.removeMetric(shippedOpsKey); 212 rms.removeMetric(shippedKBsKey); 213 rms.removeMetric(shippedBytesKey); 214 215 rms.removeMetric(logReadInBytesKey); 216 rms.removeMetric(logReadInEditsKey); 217 218 rms.removeMetric(logEditsFilteredKey); 219 220 rms.removeMetric(shippedHFilesKey); 221 rms.removeMetric(sizeOfHFileRefsQueueKey); 222 223 rms.removeMetric(unknownFileLengthKey); 224 rms.removeMetric(uncleanlyClosedKey); 225 rms.removeMetric(uncleanlySkippedBytesKey); 226 rms.removeMetric(restartedKey); 227 rms.removeMetric(repeatedBytesKey); 228 rms.removeMetric(completedLogsKey); 229 rms.removeMetric(completedRecoveryKey); 230 rms.removeMetric(oldestWalAgeKey); 231 rms.removeMetric(sourceInitializingKey); 232 } 233 234 @Override 235 public long getLastShippedAge() { 236 return ageOfLastShippedOpHist.getMax(); 237 } 238 239 @Override 240 public void incrHFilesShipped(long hfiles) { 241 shippedHFilesCounter.incr(hfiles); 242 } 243 244 @Override 245 public void incrSizeOfHFileRefsQueue(long size) { 246 sizeOfHFileRefsQueueGauge.incr(size); 247 } 248 249 @Override 250 public void decrSizeOfHFileRefsQueue(long size) { 251 sizeOfHFileRefsQueueGauge.decr(size); 252 } 253 254 @Override 255 public int getSizeOfLogQueue() { 256 return (int) sizeOfLogQueueGauge.value(); 257 } 258 259 @Override 260 public void incrUnknownFileLengthForClosedWAL() { 261 unknownFileLengthForClosedWAL.incr(1L); 262 } 263 264 @Override 265 public void incrUncleanlyClosedWALs() { 266 uncleanlyClosedWAL.incr(1L); 267 } 268 269 @Override 270 public long getUncleanlyClosedWALs() { 271 return uncleanlyClosedWAL.value(); 272 } 273 274 @Override 275 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 276 uncleanlyClosedSkippedBytes.incr(bytes); 277 } 278 279 @Override 280 public void incrRestartedWALReading() { 281 restartWALReading.incr(1L); 282 } 283 284 @Override 285 public void incrRepeatedFileBytes(final long bytes) { 286 repeatedFileBytes.incr(bytes); 287 } 288 289 @Override 290 public void incrCompletedWAL() { 291 completedWAL.incr(1L); 292 } 293 294 @Override 295 public void incrCompletedRecoveryQueue() { 296 completedRecoveryQueue.incr(1L); 297 } 298 299 @Override 300 public void incrFailedRecoveryQueue() { 301 /* no op */} 302 303 @Override 304 public void setOldestWalAge(long age) { 305 oldestWalAge.set(age); 306 } 307 308 @Override 309 public long getOldestWalAge() { 310 return oldestWalAge.value(); 311 } 312 313 @Override 314 public void incrSourceInitializing() { 315 sourceInitializing.incr(1); 316 } 317 318 @Override 319 public int getSourceInitializing() { 320 return sourceInitializing.value(); 321 } 322 323 @Override 324 public void decrSourceInitializing() { 325 sourceInitializing.decr(1); 326 } 327 328 @Override 329 public void init() { 330 rms.init(); 331 } 332 333 @Override 334 public void setGauge(String gaugeName, long value) { 335 rms.setGauge(this.keyPrefix + gaugeName, value); 336 } 337 338 @Override 339 public void incGauge(String gaugeName, long delta) { 340 rms.incGauge(this.keyPrefix + gaugeName, delta); 341 } 342 343 @Override 344 public void decGauge(String gaugeName, long delta) { 345 rms.decGauge(this.keyPrefix + gaugeName, delta); 346 } 347 348 @Override 349 public void removeMetric(String key) { 350 rms.removeMetric(this.keyPrefix + key); 351 } 352 353 @Override 354 public void incCounters(String counterName, long delta) { 355 rms.incCounters(this.keyPrefix + counterName, delta); 356 } 357 358 @Override 359 public void updateHistogram(String name, long value) { 360 rms.updateHistogram(this.keyPrefix + name, value); 361 } 362 363 @Override 364 public String getMetricsContext() { 365 return rms.getMetricsContext(); 366 } 367 368 @Override 369 public String getMetricsDescription() { 370 return rms.getMetricsDescription(); 371 } 372 373 @Override 374 public String getMetricsJmxContext() { 375 return rms.getMetricsJmxContext(); 376 } 377 378 @Override 379 public String getMetricsName() { 380 return rms.getMetricsName(); 381 } 382 383 @Override 384 public long getWALEditsRead() { 385 return this.logReadInEditsCounter.value(); 386 } 387 388 @Override 389 public long getShippedOps() { 390 return this.shippedOpsCounter.value(); 391 } 392 393 @Override 394 public long getEditsFiltered() { 395 return this.walEditsFilteredCounter.value(); 396 } 397}