001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import org.apache.hadoop.metrics2.lib.MutableFastCounter; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationGlobalSourceSourceImpl 028 implements MetricsReplicationGlobalSourceSource { 029 private static final String KEY_PREFIX = "source."; 030 031 private final MetricsReplicationSourceImpl rms; 032 033 private final MutableHistogram ageOfLastShippedOpHist; 034 private final MutableGaugeLong sizeOfLogQueueGauge; 035 private final MutableFastCounter logReadInEditsCounter; 036 private final MutableFastCounter walEditsFilteredCounter; 037 private final MutableFastCounter shippedBatchesCounter; 038 private final MutableFastCounter shippedOpsCounter; 039 private final MutableFastCounter shippedBytesCounter; 040 041 /** 042 * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead. 043 */ 044 @Deprecated 045 private final MutableFastCounter shippedKBsCounter; 046 private final MutableFastCounter logReadInBytesCounter; 047 private final MutableFastCounter shippedHFilesCounter; 048 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 049 private final MutableFastCounter unknownFileLengthForClosedWAL; 050 private final MutableFastCounter uncleanlyClosedWAL; 051 private final MutableFastCounter uncleanlyClosedSkippedBytes; 052 private final MutableFastCounter restartWALReading; 053 private final MutableFastCounter repeatedFileBytes; 054 private final MutableFastCounter completedWAL; 055 private final MutableFastCounter completedRecoveryQueue; 056 private final MutableFastCounter failedRecoveryQueue; 057 private final MutableGaugeLong walReaderBufferUsageBytes; 058 059 public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { 060 this.rms = rms; 061 062 ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); 063 064 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); 065 066 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); 067 068 shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); 069 070 shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); 071 072 shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); 073 074 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); 075 076 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); 077 078 walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); 079 080 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); 081 082 sizeOfHFileRefsQueueGauge = 083 rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); 084 085 unknownFileLengthForClosedWAL = rms.getMetricsRegistry() 086 .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); 087 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); 088 uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() 089 .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); 090 restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); 091 repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); 092 completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); 093 completedRecoveryQueue = rms.getMetricsRegistry() 094 .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); 095 failedRecoveryQueue = rms.getMetricsRegistry() 096 .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); 097 098 walReaderBufferUsageBytes = rms.getMetricsRegistry() 099 .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); 100 } 101 102 @Override public void setLastShippedAge(long age) { 103 ageOfLastShippedOpHist.add(age); 104 } 105 106 @Override public void incrSizeOfLogQueue(int size) { 107 sizeOfLogQueueGauge.incr(size); 108 } 109 110 @Override public void decrSizeOfLogQueue(int size) { 111 sizeOfLogQueueGauge.decr(size); 112 } 113 114 @Override public void incrLogReadInEdits(long size) { 115 logReadInEditsCounter.incr(size); 116 } 117 118 @Override public void incrLogEditsFiltered(long size) { 119 walEditsFilteredCounter.incr(size); 120 } 121 122 @Override public void incrBatchesShipped(int batches) { 123 shippedBatchesCounter.incr(batches); 124 } 125 126 @Override public void incrOpsShipped(long ops) { 127 shippedOpsCounter.incr(ops); 128 } 129 130 @Override public void incrShippedBytes(long size) { 131 shippedBytesCounter.incr(size); 132 // obtained value maybe smaller than 1024. We should make sure that KB count 133 // eventually picks up even from multiple smaller updates. 134 incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); 135 } 136 137 static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { 138 // Following code should be thread-safe. 139 long delta = 0; 140 while(true) { 141 long bytes = bytesCounter.value(); 142 delta = (bytes / 1024) - kbsCounter.value(); 143 if (delta > 0) { 144 kbsCounter.incr(delta); 145 } else { 146 break; 147 } 148 } 149 } 150 @Override public void incrLogReadInBytes(long size) { 151 logReadInBytesCounter.incr(size); 152 } 153 154 @Override public void clear() { 155 } 156 157 @Override 158 public long getLastShippedAge() { 159 return ageOfLastShippedOpHist.getMax(); 160 } 161 162 @Override public void incrHFilesShipped(long hfiles) { 163 shippedHFilesCounter.incr(hfiles); 164 } 165 166 @Override 167 public void incrSizeOfHFileRefsQueue(long size) { 168 sizeOfHFileRefsQueueGauge.incr(size); 169 } 170 171 @Override 172 public void decrSizeOfHFileRefsQueue(long size) { 173 sizeOfHFileRefsQueueGauge.decr(size); 174 } 175 176 @Override 177 public int getSizeOfLogQueue() { 178 return (int)sizeOfLogQueueGauge.value(); 179 } 180 181 @Override 182 public void incrUnknownFileLengthForClosedWAL() { 183 unknownFileLengthForClosedWAL.incr(1L); 184 } 185 @Override 186 public void incrUncleanlyClosedWALs() { 187 uncleanlyClosedWAL.incr(1L); 188 } 189 @Override 190 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 191 uncleanlyClosedSkippedBytes.incr(bytes); 192 } 193 @Override 194 public void incrRestartedWALReading() { 195 restartWALReading.incr(1L); 196 } 197 @Override 198 public void incrRepeatedFileBytes(final long bytes) { 199 repeatedFileBytes.incr(bytes); 200 } 201 @Override 202 public void incrCompletedWAL() { 203 completedWAL.incr(1L); 204 } 205 @Override 206 public void incrCompletedRecoveryQueue() { 207 completedRecoveryQueue.incr(1L); 208 } 209 @Override 210 public void incrFailedRecoveryQueue() { 211 failedRecoveryQueue.incr(1L); 212 } 213 @Override 214 public void init() { 215 rms.init(); 216 } 217 218 @Override 219 public void setGauge(String gaugeName, long value) { 220 rms.setGauge(KEY_PREFIX + gaugeName, value); 221 } 222 223 @Override 224 public void incGauge(String gaugeName, long delta) { 225 rms.incGauge(KEY_PREFIX + gaugeName, delta); 226 } 227 228 @Override 229 public void decGauge(String gaugeName, long delta) { 230 rms.decGauge(KEY_PREFIX + gaugeName, delta); 231 } 232 233 @Override 234 public void removeMetric(String key) { 235 rms.removeMetric(KEY_PREFIX + key); 236 } 237 238 @Override 239 public void incCounters(String counterName, long delta) { 240 rms.incCounters(KEY_PREFIX + counterName, delta); 241 } 242 243 @Override 244 public void updateHistogram(String name, long value) { 245 rms.updateHistogram(KEY_PREFIX + name, value); 246 } 247 248 @Override 249 public String getMetricsContext() { 250 return rms.getMetricsContext(); 251 } 252 253 @Override 254 public String getMetricsDescription() { 255 return rms.getMetricsDescription(); 256 } 257 258 @Override 259 public String getMetricsJmxContext() { 260 return rms.getMetricsJmxContext(); 261 } 262 263 @Override 264 public String getMetricsName() { 265 return rms.getMetricsName(); 266 } 267 268 @Override 269 public long getWALEditsRead() { 270 return this.logReadInEditsCounter.value(); 271 } 272 273 @Override 274 public long getShippedOps() { 275 return this.shippedOpsCounter.value(); 276 } 277 278 @Override 279 public long getEditsFiltered() { 280 return this.walEditsFilteredCounter.value(); 281 } 282 283 @Override 284 public void setWALReaderEditsBufferBytes(long usage) { 285 this.walReaderBufferUsageBytes.set(usage); 286 } 287 288 @Override 289 public long getWALReaderEditsBufferBytes() { 290 return this.walReaderBufferUsageBytes.value(); 291 } 292}