001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import org.apache.hadoop.metrics2.lib.MutableFastCounter; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ 028 private static final String KEY_PREFIX = "source."; 029 030 private final MetricsReplicationSourceImpl rms; 031 032 private final MutableHistogram ageOfLastShippedOpHist; 033 private final MutableGaugeLong sizeOfLogQueueGauge; 034 private final MutableFastCounter logReadInEditsCounter; 035 private final MutableFastCounter walEditsFilteredCounter; 036 private final MutableFastCounter shippedBatchesCounter; 037 private final MutableFastCounter shippedOpsCounter; 038 private final MutableFastCounter shippedBytesCounter; 039 040 /** 041 * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead. 042 */ 043 @Deprecated 044 private final MutableFastCounter shippedKBsCounter; 045 private final MutableFastCounter logReadInBytesCounter; 046 private final MutableFastCounter shippedHFilesCounter; 047 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 048 private final MutableFastCounter unknownFileLengthForClosedWAL; 049 private final MutableFastCounter uncleanlyClosedWAL; 050 private final MutableFastCounter uncleanlyClosedSkippedBytes; 051 private final MutableFastCounter restartWALReading; 052 private final MutableFastCounter repeatedFileBytes; 053 private final MutableFastCounter completedWAL; 054 private final MutableFastCounter completedRecoveryQueue; 055 private final MutableFastCounter failedRecoveryQueue; 056 057 public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { 058 this.rms = rms; 059 060 ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); 061 062 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); 063 064 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); 065 066 shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); 067 068 shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); 069 070 shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); 071 072 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); 073 074 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); 075 076 walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); 077 078 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); 079 080 sizeOfHFileRefsQueueGauge = 081 rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); 082 083 unknownFileLengthForClosedWAL = rms.getMetricsRegistry() 084 .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); 085 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); 086 uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() 087 .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); 088 restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); 089 repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); 090 completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); 091 completedRecoveryQueue = rms.getMetricsRegistry() 092 .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); 093 failedRecoveryQueue = rms.getMetricsRegistry() 094 .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); 095 } 096 097 @Override public void setLastShippedAge(long age) { 098 ageOfLastShippedOpHist.add(age); 099 } 100 101 @Override public void incrSizeOfLogQueue(int size) { 102 sizeOfLogQueueGauge.incr(size); 103 } 104 105 @Override public void decrSizeOfLogQueue(int size) { 106 sizeOfLogQueueGauge.decr(size); 107 } 108 109 @Override public void incrLogReadInEdits(long size) { 110 logReadInEditsCounter.incr(size); 111 } 112 113 @Override public void incrLogEditsFiltered(long size) { 114 walEditsFilteredCounter.incr(size); 115 } 116 117 @Override public void incrBatchesShipped(int batches) { 118 shippedBatchesCounter.incr(batches); 119 } 120 121 @Override public void incrOpsShipped(long ops) { 122 shippedOpsCounter.incr(ops); 123 } 124 125 @Override public void incrShippedBytes(long size) { 126 shippedBytesCounter.incr(size); 127 // obtained value maybe smaller than 1024. We should make sure that KB count 128 // eventually picks up even from multiple smaller updates. 129 incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); 130 } 131 132 static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { 133 // Following code should be thread-safe. 134 long delta = 0; 135 while(true) { 136 long bytes = bytesCounter.value(); 137 delta = (bytes / 1024) - kbsCounter.value(); 138 if (delta > 0) { 139 kbsCounter.incr(delta); 140 } else { 141 break; 142 } 143 } 144 } 145 146 @Override public void incrLogReadInBytes(long size) { 147 logReadInBytesCounter.incr(size); 148 } 149 150 @Override public void clear() { 151 } 152 153 @Override 154 public long getLastShippedAge() { 155 return ageOfLastShippedOpHist.getMax(); 156 } 157 158 @Override public void incrHFilesShipped(long hfiles) { 159 shippedHFilesCounter.incr(hfiles); 160 } 161 162 @Override 163 public void incrSizeOfHFileRefsQueue(long size) { 164 sizeOfHFileRefsQueueGauge.incr(size); 165 } 166 167 @Override 168 public void decrSizeOfHFileRefsQueue(long size) { 169 sizeOfHFileRefsQueueGauge.decr(size); 170 } 171 172 @Override 173 public int getSizeOfLogQueue() { 174 return (int)sizeOfLogQueueGauge.value(); 175 } 176 177 @Override 178 public void incrUnknownFileLengthForClosedWAL() { 179 unknownFileLengthForClosedWAL.incr(1L); 180 } 181 @Override 182 public void incrUncleanlyClosedWALs() { 183 uncleanlyClosedWAL.incr(1L); 184 } 185 @Override 186 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 187 uncleanlyClosedSkippedBytes.incr(bytes); 188 } 189 @Override 190 public void incrRestartedWALReading() { 191 restartWALReading.incr(1L); 192 } 193 @Override 194 public void incrRepeatedFileBytes(final long bytes) { 195 repeatedFileBytes.incr(bytes); 196 } 197 @Override 198 public void incrCompletedWAL() { 199 completedWAL.incr(1L); 200 } 201 @Override 202 public void incrCompletedRecoveryQueue() { 203 completedRecoveryQueue.incr(1L); 204 } 205 @Override 206 public void incrFailedRecoveryQueue() { 207 failedRecoveryQueue.incr(1L); 208 } 209 @Override 210 public void init() { 211 rms.init(); 212 } 213 214 @Override 215 public void setGauge(String gaugeName, long value) { 216 rms.setGauge(KEY_PREFIX + gaugeName, value); 217 } 218 219 @Override 220 public void incGauge(String gaugeName, long delta) { 221 rms.incGauge(KEY_PREFIX + gaugeName, delta); 222 } 223 224 @Override 225 public void decGauge(String gaugeName, long delta) { 226 rms.decGauge(KEY_PREFIX + gaugeName, delta); 227 } 228 229 @Override 230 public void removeMetric(String key) { 231 rms.removeMetric(KEY_PREFIX + key); 232 } 233 234 @Override 235 public void incCounters(String counterName, long delta) { 236 rms.incCounters(KEY_PREFIX + counterName, delta); 237 } 238 239 @Override 240 public void updateHistogram(String name, long value) { 241 rms.updateHistogram(KEY_PREFIX + name, value); 242 } 243 244 @Override 245 public String getMetricsContext() { 246 return rms.getMetricsContext(); 247 } 248 249 @Override 250 public String getMetricsDescription() { 251 return rms.getMetricsDescription(); 252 } 253 254 @Override 255 public String getMetricsJmxContext() { 256 return rms.getMetricsJmxContext(); 257 } 258 259 @Override 260 public String getMetricsName() { 261 return rms.getMetricsName(); 262 } 263 264 @Override 265 public long getWALEditsRead() { 266 return this.logReadInEditsCounter.value(); 267 } 268 269 @Override 270 public long getShippedOps() { 271 return this.shippedOpsCounter.value(); 272 } 273 274 @Override 275 public long getEditsFiltered() { 276 return this.walEditsFilteredCounter.value(); 277 } 278}