001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import org.apache.hadoop.metrics2.lib.MutableFastCounter; 021import org.apache.hadoop.metrics2.lib.MutableGaugeInt; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationGlobalSourceSourceImpl 028 implements MetricsReplicationGlobalSourceSource { 029 private static final String KEY_PREFIX = "source."; 030 031 private final MetricsReplicationSourceImpl rms; 032 033 private final MutableHistogram ageOfLastShippedOpHist; 034 private final MutableGaugeLong sizeOfLogQueueGauge; 035 private final MutableFastCounter logReadInEditsCounter; 036 private final MutableFastCounter walEditsFilteredCounter; 037 private final MutableFastCounter shippedBatchesCounter; 038 private final MutableFastCounter failedBatchesCounter; 039 private final MutableFastCounter shippedOpsCounter; 040 private final MutableFastCounter shippedBytesCounter; 041 private final MutableFastCounter logReadInBytesCounter; 042 private final MutableFastCounter shippedHFilesCounter; 043 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 044 private final MutableFastCounter unknownFileLengthForClosedWAL; 045 private final MutableFastCounter uncleanlyClosedWAL; 046 private final MutableFastCounter uncleanlyClosedSkippedBytes; 047 private final MutableFastCounter restartWALReading; 048 private final MutableFastCounter repeatedFileBytes; 049 private final MutableFastCounter completedWAL; 050 private final MutableFastCounter completedRecoveryQueue; 051 private final MutableFastCounter failedRecoveryQueue; 052 private final MutableGaugeLong walReaderBufferUsageBytes; 053 private final MutableGaugeInt sourceInitializing; 054 055 public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { 056 this.rms = rms; 057 058 ageOfLastShippedOpHist = 059 rms.getMetricsRegistry().newTimeHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); 060 061 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); 062 063 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); 064 065 failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L); 066 067 shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); 068 069 shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); 070 071 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); 072 073 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); 074 075 walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); 076 077 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); 078 079 sizeOfHFileRefsQueueGauge = 080 rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); 081 082 unknownFileLengthForClosedWAL = 083 rms.getMetricsRegistry().getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); 084 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); 085 uncleanlyClosedSkippedBytes = 086 rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); 087 restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); 088 repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); 089 completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); 090 completedRecoveryQueue = 091 rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); 092 failedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); 093 094 walReaderBufferUsageBytes = 095 rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); 096 sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0); 097 } 098 099 @Override 100 public void setLastShippedAge(long age) { 101 ageOfLastShippedOpHist.add(age); 102 } 103 104 @Override 105 public void incrSizeOfLogQueue(int size) { 106 sizeOfLogQueueGauge.incr(size); 107 } 108 109 @Override 110 public void decrSizeOfLogQueue(int size) { 111 sizeOfLogQueueGauge.decr(size); 112 } 113 114 @Override 115 public void incrLogReadInEdits(long size) { 116 logReadInEditsCounter.incr(size); 117 } 118 119 @Override 120 public void incrLogEditsFiltered(long size) { 121 walEditsFilteredCounter.incr(size); 122 } 123 124 @Override 125 public void incrBatchesShipped(int batches) { 126 shippedBatchesCounter.incr(batches); 127 } 128 129 @Override 130 public void incrFailedBatches() { 131 failedBatchesCounter.incr(); 132 } 133 134 @Override 135 public void incrOpsShipped(long ops) { 136 shippedOpsCounter.incr(ops); 137 } 138 139 @Override 140 public void incrShippedBytes(long size) { 141 shippedBytesCounter.incr(size); 142 } 143 144 @Override 145 public void incrLogReadInBytes(long size) { 146 logReadInBytesCounter.incr(size); 147 } 148 149 @Override 150 public void clear() { 151 } 152 153 @Override 154 public long getLastShippedAge() { 155 return ageOfLastShippedOpHist.getMax(); 156 } 157 158 @Override 159 public void incrHFilesShipped(long hfiles) { 160 shippedHFilesCounter.incr(hfiles); 161 } 162 163 @Override 164 public void incrSizeOfHFileRefsQueue(long size) { 165 sizeOfHFileRefsQueueGauge.incr(size); 166 } 167 168 @Override 169 public void decrSizeOfHFileRefsQueue(long size) { 170 sizeOfHFileRefsQueueGauge.decr(size); 171 } 172 173 @Override 174 public int getSizeOfLogQueue() { 175 return (int) sizeOfLogQueueGauge.value(); 176 } 177 178 @Override 179 public void incrUnknownFileLengthForClosedWAL() { 180 unknownFileLengthForClosedWAL.incr(1L); 181 } 182 183 @Override 184 public void incrUncleanlyClosedWALs() { 185 uncleanlyClosedWAL.incr(1L); 186 } 187 188 @Override 189 public long getUncleanlyClosedWALs() { 190 return uncleanlyClosedWAL.value(); 191 } 192 193 @Override 194 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 195 uncleanlyClosedSkippedBytes.incr(bytes); 196 } 197 198 @Override 199 public void incrRestartedWALReading() { 200 restartWALReading.incr(1L); 201 } 202 203 @Override 204 public void incrRepeatedFileBytes(final long bytes) { 205 repeatedFileBytes.incr(bytes); 206 } 207 208 @Override 209 public void incrCompletedWAL() { 210 completedWAL.incr(1L); 211 } 212 213 @Override 214 public void incrCompletedRecoveryQueue() { 215 completedRecoveryQueue.incr(1L); 216 } 217 218 @Override 219 public void incrFailedRecoveryQueue() { 220 failedRecoveryQueue.incr(1L); 221 } 222 223 @Override 224 public void setOldestWalAge(long age) { 225 // Not implemented 226 } 227 228 @Override 229 public long getOldestWalAge() { 230 // Not implemented 231 return 0; 232 } 233 234 @Override 235 public void incrSourceInitializing() { 236 sourceInitializing.incr(1); 237 } 238 239 @Override 240 public void decrSourceInitializing() { 241 sourceInitializing.decr(1); 242 } 243 244 @Override 245 public int getSourceInitializing() { 246 return sourceInitializing.value(); 247 } 248 249 @Override 250 public void init() { 251 rms.init(); 252 } 253 254 @Override 255 public void setGauge(String gaugeName, long value) { 256 rms.setGauge(KEY_PREFIX + gaugeName, value); 257 } 258 259 @Override 260 public void incGauge(String gaugeName, long delta) { 261 rms.incGauge(KEY_PREFIX + gaugeName, delta); 262 } 263 264 @Override 265 public void decGauge(String gaugeName, long delta) { 266 rms.decGauge(KEY_PREFIX + gaugeName, delta); 267 } 268 269 @Override 270 public void removeMetric(String key) { 271 rms.removeMetric(KEY_PREFIX + key); 272 } 273 274 @Override 275 public void incCounters(String counterName, long delta) { 276 rms.incCounters(KEY_PREFIX + counterName, delta); 277 } 278 279 @Override 280 public void updateHistogram(String name, long value) { 281 rms.updateHistogram(KEY_PREFIX + name, value); 282 } 283 284 @Override 285 public String getMetricsContext() { 286 return rms.getMetricsContext(); 287 } 288 289 @Override 290 public String getMetricsDescription() { 291 return rms.getMetricsDescription(); 292 } 293 294 @Override 295 public String getMetricsJmxContext() { 296 return rms.getMetricsJmxContext(); 297 } 298 299 @Override 300 public String getMetricsName() { 301 return rms.getMetricsName(); 302 } 303 304 @Override 305 public long getWALEditsRead() { 306 return this.logReadInEditsCounter.value(); 307 } 308 309 @Override 310 public long getShippedOps() { 311 return this.shippedOpsCounter.value(); 312 } 313 314 @Override 315 public long getEditsFiltered() { 316 return this.walEditsFilteredCounter.value(); 317 } 318 319 @Override 320 public void setWALReaderEditsBufferBytes(long usage) { 321 this.walReaderBufferUsageBytes.set(usage); 322 } 323 324 @Override 325 public long getWALReaderEditsBufferBytes() { 326 return this.walReaderBufferUsageBytes.value(); 327 } 328}