001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import org.apache.hadoop.metrics2.lib.MutableFastCounter; 021import org.apache.hadoop.metrics2.lib.MutableGaugeInt; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationGlobalSourceSourceImpl 028 implements MetricsReplicationGlobalSourceSource { 029 private static final String KEY_PREFIX = "source."; 030 031 private final MetricsReplicationSourceImpl rms; 032 033 private final MutableHistogram ageOfLastShippedOpHist; 034 private final MutableGaugeLong sizeOfLogQueueGauge; 035 private final MutableFastCounter logReadInEditsCounter; 036 private final MutableFastCounter walEditsFilteredCounter; 037 private final MutableFastCounter shippedBatchesCounter; 038 private final MutableFastCounter failedBatchesCounter; 039 private final MutableFastCounter shippedOpsCounter; 040 private final MutableFastCounter shippedBytesCounter; 041 042 /** 043 * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead. 044 */ 045 @Deprecated 046 private final MutableFastCounter shippedKBsCounter; 047 private final MutableFastCounter logReadInBytesCounter; 048 private final MutableFastCounter shippedHFilesCounter; 049 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 050 private final MutableFastCounter unknownFileLengthForClosedWAL; 051 private final MutableFastCounter uncleanlyClosedWAL; 052 private final MutableFastCounter uncleanlyClosedSkippedBytes; 053 private final MutableFastCounter restartWALReading; 054 private final MutableFastCounter repeatedFileBytes; 055 private final MutableFastCounter completedWAL; 056 private final MutableFastCounter completedRecoveryQueue; 057 private final MutableFastCounter failedRecoveryQueue; 058 private final MutableGaugeLong walReaderBufferUsageBytes; 059 private final MutableGaugeInt sourceInitializing; 060 061 public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { 062 this.rms = rms; 063 064 ageOfLastShippedOpHist = 065 rms.getMetricsRegistry().newTimeHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); 066 067 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); 068 069 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); 070 071 failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L); 072 073 shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); 074 075 shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); 076 077 shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); 078 079 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); 080 081 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); 082 083 walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); 084 085 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); 086 087 sizeOfHFileRefsQueueGauge = 088 rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); 089 090 unknownFileLengthForClosedWAL = 091 rms.getMetricsRegistry().getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); 092 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); 093 uncleanlyClosedSkippedBytes = 094 rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); 095 restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); 096 repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); 097 completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); 098 completedRecoveryQueue = 099 rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); 100 failedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); 101 102 walReaderBufferUsageBytes = 103 rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); 104 sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0); 105 } 106 107 @Override 108 public void setLastShippedAge(long age) { 109 ageOfLastShippedOpHist.add(age); 110 } 111 112 @Override 113 public void incrSizeOfLogQueue(int size) { 114 sizeOfLogQueueGauge.incr(size); 115 } 116 117 @Override 118 public void decrSizeOfLogQueue(int size) { 119 sizeOfLogQueueGauge.decr(size); 120 } 121 122 @Override 123 public void incrLogReadInEdits(long size) { 124 logReadInEditsCounter.incr(size); 125 } 126 127 @Override 128 public void incrLogEditsFiltered(long size) { 129 walEditsFilteredCounter.incr(size); 130 } 131 132 @Override 133 public void incrBatchesShipped(int batches) { 134 shippedBatchesCounter.incr(batches); 135 } 136 137 @Override 138 public void incrFailedBatches() { 139 failedBatchesCounter.incr(); 140 } 141 142 @Override 143 public void incrOpsShipped(long ops) { 144 shippedOpsCounter.incr(ops); 145 } 146 147 @Override 148 public void incrShippedBytes(long size) { 149 shippedBytesCounter.incr(size); 150 // obtained value maybe smaller than 1024. We should make sure that KB count 151 // eventually picks up even from multiple smaller updates. 152 incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); 153 } 154 155 static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { 156 // Following code should be thread-safe. 157 long delta = 0; 158 while (true) { 159 long bytes = bytesCounter.value(); 160 delta = (bytes / 1024) - kbsCounter.value(); 161 if (delta > 0) { 162 kbsCounter.incr(delta); 163 } else { 164 break; 165 } 166 } 167 } 168 169 @Override 170 public void incrLogReadInBytes(long size) { 171 logReadInBytesCounter.incr(size); 172 } 173 174 @Override 175 public void clear() { 176 } 177 178 @Override 179 public long getLastShippedAge() { 180 return ageOfLastShippedOpHist.getMax(); 181 } 182 183 @Override 184 public void incrHFilesShipped(long hfiles) { 185 shippedHFilesCounter.incr(hfiles); 186 } 187 188 @Override 189 public void incrSizeOfHFileRefsQueue(long size) { 190 sizeOfHFileRefsQueueGauge.incr(size); 191 } 192 193 @Override 194 public void decrSizeOfHFileRefsQueue(long size) { 195 sizeOfHFileRefsQueueGauge.decr(size); 196 } 197 198 @Override 199 public int getSizeOfLogQueue() { 200 return (int) sizeOfLogQueueGauge.value(); 201 } 202 203 @Override 204 public void incrUnknownFileLengthForClosedWAL() { 205 unknownFileLengthForClosedWAL.incr(1L); 206 } 207 208 @Override 209 public void incrUncleanlyClosedWALs() { 210 uncleanlyClosedWAL.incr(1L); 211 } 212 213 @Override 214 public long getUncleanlyClosedWALs() { 215 return uncleanlyClosedWAL.value(); 216 } 217 218 @Override 219 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 220 uncleanlyClosedSkippedBytes.incr(bytes); 221 } 222 223 @Override 224 public void incrRestartedWALReading() { 225 restartWALReading.incr(1L); 226 } 227 228 @Override 229 public void incrRepeatedFileBytes(final long bytes) { 230 repeatedFileBytes.incr(bytes); 231 } 232 233 @Override 234 public void incrCompletedWAL() { 235 completedWAL.incr(1L); 236 } 237 238 @Override 239 public void incrCompletedRecoveryQueue() { 240 completedRecoveryQueue.incr(1L); 241 } 242 243 @Override 244 public void incrFailedRecoveryQueue() { 245 failedRecoveryQueue.incr(1L); 246 } 247 248 @Override 249 public void setOldestWalAge(long age) { 250 // Not implemented 251 } 252 253 @Override 254 public long getOldestWalAge() { 255 // Not implemented 256 return 0; 257 } 258 259 @Override 260 public void incrSourceInitializing() { 261 sourceInitializing.incr(1); 262 } 263 264 @Override 265 public void decrSourceInitializing() { 266 sourceInitializing.decr(1); 267 } 268 269 @Override 270 public int getSourceInitializing() { 271 return sourceInitializing.value(); 272 } 273 274 @Override 275 public void init() { 276 rms.init(); 277 } 278 279 @Override 280 public void setGauge(String gaugeName, long value) { 281 rms.setGauge(KEY_PREFIX + gaugeName, value); 282 } 283 284 @Override 285 public void incGauge(String gaugeName, long delta) { 286 rms.incGauge(KEY_PREFIX + gaugeName, delta); 287 } 288 289 @Override 290 public void decGauge(String gaugeName, long delta) { 291 rms.decGauge(KEY_PREFIX + gaugeName, delta); 292 } 293 294 @Override 295 public void removeMetric(String key) { 296 rms.removeMetric(KEY_PREFIX + key); 297 } 298 299 @Override 300 public void incCounters(String counterName, long delta) { 301 rms.incCounters(KEY_PREFIX + counterName, delta); 302 } 303 304 @Override 305 public void updateHistogram(String name, long value) { 306 rms.updateHistogram(KEY_PREFIX + name, value); 307 } 308 309 @Override 310 public String getMetricsContext() { 311 return rms.getMetricsContext(); 312 } 313 314 @Override 315 public String getMetricsDescription() { 316 return rms.getMetricsDescription(); 317 } 318 319 @Override 320 public String getMetricsJmxContext() { 321 return rms.getMetricsJmxContext(); 322 } 323 324 @Override 325 public String getMetricsName() { 326 return rms.getMetricsName(); 327 } 328 329 @Override 330 public long getWALEditsRead() { 331 return this.logReadInEditsCounter.value(); 332 } 333 334 @Override 335 public long getShippedOps() { 336 return this.shippedOpsCounter.value(); 337 } 338 339 @Override 340 public long getEditsFiltered() { 341 return this.walEditsFilteredCounter.value(); 342 } 343 344 @Override 345 public void setWALReaderEditsBufferBytes(long usage) { 346 this.walReaderBufferUsageBytes.set(usage); 347 } 348 349 @Override 350 public long getWALReaderEditsBufferBytes() { 351 return this.walReaderBufferUsageBytes.value(); 352 } 353}