001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.metrics.BaseSource; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031 032/** 033 * This class is for maintaining the various replication statistics for a source and publishing them 034 * through the metrics interfaces. 035 */ 036@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 037public class MetricsSource implements BaseSource { 038 039 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 040 041 // tracks last shipped timestamp for each wal group 042 private Map<String, Long> lastTimestamps = new HashMap<>(); 043 private long lastHFileRefsQueueSize = 0; 044 private String id; 045 046 private final MetricsReplicationSourceSource singleSourceSource; 047 private final MetricsReplicationSourceSource globalSourceSource; 048 049 050 /** 051 * Constructor used to register the metrics 052 * 053 * @param id Name of the source this class is monitoring 054 */ 055 public MetricsSource(String id) { 056 this.id = id; 057 singleSourceSource = 058 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 059 .getSource(id); 060 globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 061 } 062 063 /** 064 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 065 * @param id Name of the source this class is monitoring 066 * @param singleSourceSource Class to monitor id-scoped metrics 067 * @param globalSourceSource Class to monitor global-scoped metrics 068 */ 069 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 070 MetricsReplicationSourceSource globalSourceSource) { 071 this.id = id; 072 this.singleSourceSource = singleSourceSource; 073 this.globalSourceSource = globalSourceSource; 074 } 075 076 /** 077 * Set the age of the last edit that was shipped 078 * @param timestamp write time of the edit 079 * @param walGroup which group we are setting 080 */ 081 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 082 long age = EnvironmentEdgeManager.currentTime() - timestamp; 083 singleSourceSource.setLastShippedAge(age); 084 globalSourceSource.setLastShippedAge(age); 085 this.lastTimestamps.put(walGroup, timestamp); 086 } 087 088 /** 089 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 090 * when replication fails and need to keep that metric accurate. 091 * @param walGroupId id of the group to update 092 */ 093 public void refreshAgeOfLastShippedOp(String walGroupId) { 094 Long lastTimestamp = this.lastTimestamps.get(walGroupId); 095 if (lastTimestamp == null) { 096 this.lastTimestamps.put(walGroupId, 0L); 097 lastTimestamp = 0L; 098 } 099 if (lastTimestamp > 0) { 100 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 101 } 102 } 103 104 /** 105 * Increment size of the log queue. 106 */ 107 public void incrSizeOfLogQueue() { 108 singleSourceSource.incrSizeOfLogQueue(1); 109 globalSourceSource.incrSizeOfLogQueue(1); 110 } 111 112 public void decrSizeOfLogQueue() { 113 singleSourceSource.decrSizeOfLogQueue(1); 114 globalSourceSource.decrSizeOfLogQueue(1); 115 } 116 117 /** 118 * Add on the the number of log edits read 119 * 120 * @param delta the number of log edits read. 121 */ 122 private void incrLogEditsRead(long delta) { 123 singleSourceSource.incrLogReadInEdits(delta); 124 globalSourceSource.incrLogReadInEdits(delta); 125 } 126 127 /** Increment the number of log edits read by one. */ 128 public void incrLogEditsRead() { 129 incrLogEditsRead(1); 130 } 131 132 /** 133 * Add on the number of log edits filtered 134 * 135 * @param delta the number filtered. 136 */ 137 public void incrLogEditsFiltered(long delta) { 138 singleSourceSource.incrLogEditsFiltered(delta); 139 globalSourceSource.incrLogEditsFiltered(delta); 140 } 141 142 /** The number of log edits filtered out. */ 143 public void incrLogEditsFiltered() { 144 incrLogEditsFiltered(1); 145 } 146 147 /** 148 * Convience method to apply changes to metrics do to shipping a batch of logs. 149 * 150 * @param batchSize the size of the batch that was shipped to sinks. 151 */ 152 public void shipBatch(long batchSize, int sizeInBytes) { 153 singleSourceSource.incrBatchesShipped(1); 154 globalSourceSource.incrBatchesShipped(1); 155 156 singleSourceSource.incrOpsShipped(batchSize); 157 globalSourceSource.incrOpsShipped(batchSize); 158 159 singleSourceSource.incrShippedBytes(sizeInBytes); 160 globalSourceSource.incrShippedBytes(sizeInBytes); 161 } 162 163 /** 164 * Convience method to apply changes to metrics do to shipping a batch of logs. 165 * 166 * @param batchSize the size of the batch that was shipped to sinks. 167 * @param hfiles total number of hfiles shipped to sinks. 168 */ 169 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 170 shipBatch(batchSize, sizeInBytes); 171 singleSourceSource.incrHFilesShipped(hfiles); 172 globalSourceSource.incrHFilesShipped(hfiles); 173 } 174 175 /** increase the byte number read by source from log file */ 176 public void incrLogReadInBytes(long readInBytes) { 177 singleSourceSource.incrLogReadInBytes(readInBytes); 178 globalSourceSource.incrLogReadInBytes(readInBytes); 179 } 180 181 /** Removes all metrics about this Source. */ 182 public void clear() { 183 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 184 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 185 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 186 singleSourceSource.clear(); 187 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 188 lastTimestamps.clear(); 189 lastHFileRefsQueueSize = 0; 190 } 191 192 /** 193 * Get AgeOfLastShippedOp 194 * @return AgeOfLastShippedOp 195 */ 196 public Long getAgeOfLastShippedOp() { 197 return singleSourceSource.getLastShippedAge(); 198 } 199 200 /** 201 * Get the sizeOfLogQueue 202 * @return sizeOfLogQueue 203 */ 204 public int getSizeOfLogQueue() { 205 return singleSourceSource.getSizeOfLogQueue(); 206 } 207 208 /** 209 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one 210 * @return lastTimestampForAge 211 * @deprecated Since 2.0.0. Removed in 3.0.0. 212 * @see #getTimestampOfLastShippedOp() 213 */ 214 @Deprecated 215 public long getTimeStampOfLastShippedOp() { 216 return getTimestampOfLastShippedOp(); 217 } 218 219 /** 220 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 221 * @return lastTimestampForAge 222 */ 223 public long getTimestampOfLastShippedOp() { 224 long lastTimestamp = 0L; 225 for (long ts : lastTimestamps.values()) { 226 if (ts > lastTimestamp) { 227 lastTimestamp = ts; 228 } 229 } 230 return lastTimestamp; 231 } 232 233 /** 234 * Get the slave peer ID 235 * @return peerID 236 */ 237 public String getPeerID() { 238 return id; 239 } 240 241 public void incrSizeOfHFileRefsQueue(long size) { 242 singleSourceSource.incrSizeOfHFileRefsQueue(size); 243 globalSourceSource.incrSizeOfHFileRefsQueue(size); 244 lastHFileRefsQueueSize = size; 245 } 246 247 public void decrSizeOfHFileRefsQueue(int size) { 248 singleSourceSource.decrSizeOfHFileRefsQueue(size); 249 globalSourceSource.decrSizeOfHFileRefsQueue(size); 250 lastHFileRefsQueueSize -= size; 251 if (lastHFileRefsQueueSize < 0) { 252 lastHFileRefsQueueSize = 0; 253 } 254 } 255 256 public void incrUnknownFileLengthForClosedWAL() { 257 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 258 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 259 } 260 261 public void incrUncleanlyClosedWALs() { 262 singleSourceSource.incrUncleanlyClosedWALs(); 263 globalSourceSource.incrUncleanlyClosedWALs(); 264 } 265 266 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 267 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 268 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 269 } 270 271 public void incrRestartedWALReading() { 272 singleSourceSource.incrRestartedWALReading(); 273 globalSourceSource.incrRestartedWALReading(); 274 } 275 276 public void incrRepeatedFileBytes(final long bytes) { 277 singleSourceSource.incrRepeatedFileBytes(bytes); 278 globalSourceSource.incrRepeatedFileBytes(bytes); 279 } 280 281 public void incrCompletedWAL() { 282 singleSourceSource.incrCompletedWAL(); 283 globalSourceSource.incrCompletedWAL(); 284 } 285 286 public void incrCompletedRecoveryQueue() { 287 singleSourceSource.incrCompletedRecoveryQueue(); 288 globalSourceSource.incrCompletedRecoveryQueue(); 289 } 290 291 @Override 292 public void init() { 293 singleSourceSource.init(); 294 globalSourceSource.init(); 295 } 296 297 @Override 298 public void setGauge(String gaugeName, long value) { 299 singleSourceSource.setGauge(gaugeName, value); 300 globalSourceSource.setGauge(gaugeName, value); 301 } 302 303 @Override 304 public void incGauge(String gaugeName, long delta) { 305 singleSourceSource.incGauge(gaugeName, delta); 306 globalSourceSource.incGauge(gaugeName, delta); 307 } 308 309 @Override 310 public void decGauge(String gaugeName, long delta) { 311 singleSourceSource.decGauge(gaugeName, delta); 312 globalSourceSource.decGauge(gaugeName, delta); 313 } 314 315 @Override 316 public void removeMetric(String key) { 317 singleSourceSource.removeMetric(key); 318 globalSourceSource.removeMetric(key); 319 } 320 321 @Override 322 public void incCounters(String counterName, long delta) { 323 singleSourceSource.incCounters(counterName, delta); 324 globalSourceSource.incCounters(counterName, delta); 325 } 326 327 @Override 328 public void updateHistogram(String name, long value) { 329 singleSourceSource.updateHistogram(name, value); 330 globalSourceSource.updateHistogram(name, value); 331 } 332 333 @Override 334 public String getMetricsContext() { 335 return globalSourceSource.getMetricsContext(); 336 } 337 338 @Override 339 public String getMetricsDescription() { 340 return globalSourceSource.getMetricsDescription(); 341 } 342 343 @Override 344 public String getMetricsJmxContext() { 345 return globalSourceSource.getMetricsJmxContext(); 346 } 347 348 @Override 349 public String getMetricsName() { 350 return globalSourceSource.getMetricsName(); 351 } 352}