001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import org.apache.hadoop.metrics2.lib.MutableFastCounter; 022import org.apache.hadoop.metrics2.lib.MutableGaugeLong; 023import org.apache.hadoop.metrics2.lib.MutableHistogram; 024import org.apache.yetus.audience.InterfaceAudience; 025 026@InterfaceAudience.Private 027public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ 028 private static final String KEY_PREFIX = "source."; 029 030 private final MetricsReplicationSourceImpl rms; 031 032 private final MutableHistogram ageOfLastShippedOpHist; 033 private final MutableGaugeLong sizeOfLogQueueGauge; 034 private final MutableFastCounter logReadInEditsCounter; 035 private final MutableFastCounter logEditsFilteredCounter; 036 private final MutableFastCounter shippedBatchesCounter; 037 private final MutableFastCounter shippedOpsCounter; 038 private final MutableFastCounter shippedBytesCounter; 039 040 /** 041 * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead. 042 */ 043 @Deprecated 044 private final MutableFastCounter shippedKBsCounter; 045 private final MutableFastCounter logReadInBytesCounter; 046 private final MutableFastCounter shippedHFilesCounter; 047 private final MutableGaugeLong sizeOfHFileRefsQueueGauge; 048 private final MutableFastCounter unknownFileLengthForClosedWAL; 049 private final MutableFastCounter uncleanlyClosedWAL; 050 private final MutableFastCounter uncleanlyClosedSkippedBytes; 051 private final MutableFastCounter restartWALReading; 052 private final MutableFastCounter repeatedFileBytes; 053 private final MutableFastCounter completedWAL; 054 private final MutableFastCounter completedRecoveryQueue; 055 056 public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { 057 this.rms = rms; 058 059 ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); 060 061 sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); 062 063 shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); 064 065 shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); 066 067 shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); 068 069 shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); 070 071 logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); 072 073 logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); 074 075 logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); 076 077 shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); 078 079 sizeOfHFileRefsQueueGauge = 080 rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); 081 082 unknownFileLengthForClosedWAL = rms.getMetricsRegistry() 083 .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); 084 uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); 085 uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() 086 .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); 087 restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); 088 repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); 089 completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); 090 completedRecoveryQueue = rms.getMetricsRegistry() 091 .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); 092 } 093 094 @Override public void setLastShippedAge(long age) { 095 ageOfLastShippedOpHist.add(age); 096 } 097 098 @Override public void incrSizeOfLogQueue(int size) { 099 sizeOfLogQueueGauge.incr(size); 100 } 101 102 @Override public void decrSizeOfLogQueue(int size) { 103 sizeOfLogQueueGauge.decr(size); 104 } 105 106 @Override public void incrLogReadInEdits(long size) { 107 logReadInEditsCounter.incr(size); 108 } 109 110 @Override public void incrLogEditsFiltered(long size) { 111 logEditsFilteredCounter.incr(size); 112 } 113 114 @Override public void incrBatchesShipped(int batches) { 115 shippedBatchesCounter.incr(batches); 116 } 117 118 @Override public void incrOpsShipped(long ops) { 119 shippedOpsCounter.incr(ops); 120 } 121 122 @Override public void incrShippedBytes(long size) { 123 shippedBytesCounter.incr(size); 124 // obtained value maybe smaller than 1024. We should make sure that KB count 125 // eventually picks up even from multiple smaller updates. 126 incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); 127 } 128 129 static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { 130 // Following code should be thread-safe. 131 long delta = 0; 132 while(true) { 133 long bytes = bytesCounter.value(); 134 delta = (bytes / 1024) - kbsCounter.value(); 135 if (delta > 0) { 136 kbsCounter.incr(delta); 137 } else { 138 break; 139 } 140 } 141 } 142 143 @Override public void incrLogReadInBytes(long size) { 144 logReadInBytesCounter.incr(size); 145 } 146 147 @Override public void clear() { 148 } 149 150 @Override 151 public long getLastShippedAge() { 152 return ageOfLastShippedOpHist.getMax(); 153 } 154 155 @Override public void incrHFilesShipped(long hfiles) { 156 shippedHFilesCounter.incr(hfiles); 157 } 158 159 @Override 160 public void incrSizeOfHFileRefsQueue(long size) { 161 sizeOfHFileRefsQueueGauge.incr(size); 162 } 163 164 @Override 165 public void decrSizeOfHFileRefsQueue(long size) { 166 sizeOfHFileRefsQueueGauge.decr(size); 167 } 168 169 @Override 170 public int getSizeOfLogQueue() { 171 return (int)sizeOfLogQueueGauge.value(); 172 } 173 174 @Override 175 public void incrUnknownFileLengthForClosedWAL() { 176 unknownFileLengthForClosedWAL.incr(1L); 177 } 178 @Override 179 public void incrUncleanlyClosedWALs() { 180 uncleanlyClosedWAL.incr(1L); 181 } 182 @Override 183 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 184 uncleanlyClosedSkippedBytes.incr(bytes); 185 } 186 @Override 187 public void incrRestartedWALReading() { 188 restartWALReading.incr(1L); 189 } 190 @Override 191 public void incrRepeatedFileBytes(final long bytes) { 192 repeatedFileBytes.incr(bytes); 193 } 194 @Override 195 public void incrCompletedWAL() { 196 completedWAL.incr(1L); 197 } 198 @Override 199 public void incrCompletedRecoveryQueue() { 200 completedRecoveryQueue.incr(1L); 201 } 202 203 @Override 204 public void init() { 205 rms.init(); 206 } 207 208 @Override 209 public void setGauge(String gaugeName, long value) { 210 rms.setGauge(KEY_PREFIX + gaugeName, value); 211 } 212 213 @Override 214 public void incGauge(String gaugeName, long delta) { 215 rms.incGauge(KEY_PREFIX + gaugeName, delta); 216 } 217 218 @Override 219 public void decGauge(String gaugeName, long delta) { 220 rms.decGauge(KEY_PREFIX + gaugeName, delta); 221 } 222 223 @Override 224 public void removeMetric(String key) { 225 rms.removeMetric(KEY_PREFIX + key); 226 } 227 228 @Override 229 public void incCounters(String counterName, long delta) { 230 rms.incCounters(KEY_PREFIX + counterName, delta); 231 } 232 233 @Override 234 public void updateHistogram(String name, long value) { 235 rms.updateHistogram(KEY_PREFIX + name, value); 236 } 237 238 @Override 239 public String getMetricsContext() { 240 return rms.getMetricsContext(); 241 } 242 243 @Override 244 public String getMetricsDescription() { 245 return rms.getMetricsDescription(); 246 } 247 248 @Override 249 public String getMetricsJmxContext() { 250 return rms.getMetricsJmxContext(); 251 } 252 253 @Override 254 public String getMetricsName() { 255 return rms.getMetricsName(); 256 } 257}