001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.metrics.BaseSource; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033 034/** 035 * This class is for maintaining the various replication statistics for a source and publishing them 036 * through the metrics interfaces. 037 */ 038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 039public class MetricsSource implements BaseSource { 040 041 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 042 043 // tracks last shipped timestamp for each wal group 044 private Map<String, Long> lastTimestamps = new HashMap<>(); 045 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 046 private long lastHFileRefsQueueSize = 0; 047 private String id; 048 049 private final MetricsReplicationSourceSource singleSourceSource; 050 private final MetricsReplicationSourceSource globalSourceSource; 051 private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable; 052 053 /** 054 * Constructor used to register the metrics 055 * 056 * @param id Name of the source this class is monitoring 057 */ 058 public MetricsSource(String id) { 059 this.id = id; 060 singleSourceSource = 061 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 062 .getSource(id); 063 globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 064 singleSourceSourceByTable = new HashMap<>(); 065 } 066 067 /** 068 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 069 * @param id Name of the source this class is monitoring 070 * @param singleSourceSource Class to monitor id-scoped metrics 071 * @param globalSourceSource Class to monitor global-scoped metrics 072 */ 073 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 074 MetricsReplicationSourceSource globalSourceSource, 075 Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) { 076 this.id = id; 077 this.singleSourceSource = singleSourceSource; 078 this.globalSourceSource = globalSourceSource; 079 this.singleSourceSourceByTable = singleSourceSourceByTable; 080 } 081 082 /** 083 * Set the age of the last edit that was shipped 084 * @param timestamp write time of the edit 085 * @param walGroup which group we are setting 086 */ 087 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 088 long age = EnvironmentEdgeManager.currentTime() - timestamp; 089 singleSourceSource.setLastShippedAge(age); 090 globalSourceSource.setLastShippedAge(age); 091 this.ageOfLastShippedOp.put(walGroup, age); 092 this.lastTimestamps.put(walGroup, timestamp); 093 } 094 095 /** 096 * Set the age of the last edit that was shipped group by table 097 * @param timestamp write time of the edit 098 * @param tableName String as group and tableName 099 */ 100 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 101 long age = EnvironmentEdgeManager.currentTime() - timestamp; 102 this.getSingleSourceSourceByTable().computeIfAbsent( 103 tableName, t -> CompatibilitySingletonFactory 104 .getInstance(MetricsReplicationSourceFactory.class).getSource(t)) 105 .setLastShippedAge(age); 106 } 107 108 /** 109 * get the last timestamp of given wal group. If the walGroup is null, return 0. 110 * @param walGroup which group we are getting 111 * @return timeStamp 112 */ 113 public long getLastTimeStampOfWalGroup(String walGroup) { 114 return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); 115 } 116 117 /** 118 * get age of last shipped op of given wal group. If the walGroup is null, return 0 119 * @param walGroup which group we are getting 120 * @return age 121 */ 122 public long getAgeofLastShippedOp(String walGroup) { 123 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 124 } 125 126 /** 127 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 128 * when replication fails and need to keep that metric accurate. 129 * @param walGroupId id of the group to update 130 */ 131 public void refreshAgeOfLastShippedOp(String walGroupId) { 132 Long lastTimestamp = this.lastTimestamps.get(walGroupId); 133 if (lastTimestamp == null) { 134 this.lastTimestamps.put(walGroupId, 0L); 135 lastTimestamp = 0L; 136 } 137 if (lastTimestamp > 0) { 138 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 139 } 140 } 141 142 /** 143 * Increment size of the log queue. 144 */ 145 public void incrSizeOfLogQueue() { 146 singleSourceSource.incrSizeOfLogQueue(1); 147 globalSourceSource.incrSizeOfLogQueue(1); 148 } 149 150 public void decrSizeOfLogQueue() { 151 singleSourceSource.decrSizeOfLogQueue(1); 152 globalSourceSource.decrSizeOfLogQueue(1); 153 } 154 155 /** 156 * Add on the the number of log edits read 157 * 158 * @param delta the number of log edits read. 159 */ 160 private void incrLogEditsRead(long delta) { 161 singleSourceSource.incrLogReadInEdits(delta); 162 globalSourceSource.incrLogReadInEdits(delta); 163 } 164 165 /** Increment the number of log edits read by one. */ 166 public void incrLogEditsRead() { 167 incrLogEditsRead(1); 168 } 169 170 /** 171 * Add on the number of log edits filtered 172 * 173 * @param delta the number filtered. 174 */ 175 public void incrLogEditsFiltered(long delta) { 176 singleSourceSource.incrLogEditsFiltered(delta); 177 globalSourceSource.incrLogEditsFiltered(delta); 178 } 179 180 /** The number of log edits filtered out. */ 181 public void incrLogEditsFiltered() { 182 incrLogEditsFiltered(1); 183 } 184 185 /** 186 * Convience method to apply changes to metrics do to shipping a batch of logs. 187 * 188 * @param batchSize the size of the batch that was shipped to sinks. 189 */ 190 public void shipBatch(long batchSize, int sizeInBytes) { 191 singleSourceSource.incrBatchesShipped(1); 192 globalSourceSource.incrBatchesShipped(1); 193 194 singleSourceSource.incrOpsShipped(batchSize); 195 globalSourceSource.incrOpsShipped(batchSize); 196 197 singleSourceSource.incrShippedBytes(sizeInBytes); 198 globalSourceSource.incrShippedBytes(sizeInBytes); 199 } 200 201 /** 202 * Convience method to apply changes to metrics do to shipping a batch of logs. 203 * 204 * @param batchSize the size of the batch that was shipped to sinks. 205 * @param hfiles total number of hfiles shipped to sinks. 206 */ 207 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 208 shipBatch(batchSize, sizeInBytes); 209 singleSourceSource.incrHFilesShipped(hfiles); 210 globalSourceSource.incrHFilesShipped(hfiles); 211 } 212 213 /** increase the byte number read by source from log file */ 214 public void incrLogReadInBytes(long readInBytes) { 215 singleSourceSource.incrLogReadInBytes(readInBytes); 216 globalSourceSource.incrLogReadInBytes(readInBytes); 217 } 218 219 /** Removes all metrics about this Source. */ 220 public void clear() { 221 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 222 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 223 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 224 singleSourceSource.clear(); 225 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 226 lastTimestamps.clear(); 227 lastHFileRefsQueueSize = 0; 228 } 229 230 /** 231 * Get AgeOfLastShippedOp 232 * @return AgeOfLastShippedOp 233 */ 234 public Long getAgeOfLastShippedOp() { 235 return singleSourceSource.getLastShippedAge(); 236 } 237 238 /** 239 * Get the sizeOfLogQueue 240 * @return sizeOfLogQueue 241 */ 242 public int getSizeOfLogQueue() { 243 return singleSourceSource.getSizeOfLogQueue(); 244 } 245 246 /** 247 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one 248 * @return lastTimestampForAge 249 * @deprecated Since 2.0.0. Removed in 3.0.0. 250 * @see #getTimestampOfLastShippedOp() 251 */ 252 @Deprecated 253 public long getTimeStampOfLastShippedOp() { 254 return getTimestampOfLastShippedOp(); 255 } 256 257 /** 258 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 259 * @return lastTimestampForAge 260 */ 261 public long getTimestampOfLastShippedOp() { 262 long lastTimestamp = 0L; 263 for (long ts : lastTimestamps.values()) { 264 if (ts > lastTimestamp) { 265 lastTimestamp = ts; 266 } 267 } 268 return lastTimestamp; 269 } 270 271 /** 272 * Get the slave peer ID 273 * @return peerID 274 */ 275 public String getPeerID() { 276 return id; 277 } 278 279 public void incrSizeOfHFileRefsQueue(long size) { 280 singleSourceSource.incrSizeOfHFileRefsQueue(size); 281 globalSourceSource.incrSizeOfHFileRefsQueue(size); 282 lastHFileRefsQueueSize = size; 283 } 284 285 public void decrSizeOfHFileRefsQueue(int size) { 286 singleSourceSource.decrSizeOfHFileRefsQueue(size); 287 globalSourceSource.decrSizeOfHFileRefsQueue(size); 288 lastHFileRefsQueueSize -= size; 289 if (lastHFileRefsQueueSize < 0) { 290 lastHFileRefsQueueSize = 0; 291 } 292 } 293 294 public void incrUnknownFileLengthForClosedWAL() { 295 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 296 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 297 } 298 299 public void incrUncleanlyClosedWALs() { 300 singleSourceSource.incrUncleanlyClosedWALs(); 301 globalSourceSource.incrUncleanlyClosedWALs(); 302 } 303 304 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 305 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 306 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 307 } 308 309 public void incrRestartedWALReading() { 310 singleSourceSource.incrRestartedWALReading(); 311 globalSourceSource.incrRestartedWALReading(); 312 } 313 314 public void incrRepeatedFileBytes(final long bytes) { 315 singleSourceSource.incrRepeatedFileBytes(bytes); 316 globalSourceSource.incrRepeatedFileBytes(bytes); 317 } 318 319 public void incrCompletedWAL() { 320 singleSourceSource.incrCompletedWAL(); 321 globalSourceSource.incrCompletedWAL(); 322 } 323 324 public void incrCompletedRecoveryQueue() { 325 singleSourceSource.incrCompletedRecoveryQueue(); 326 globalSourceSource.incrCompletedRecoveryQueue(); 327 } 328 329 public void incrFailedRecoveryQueue() { 330 globalSourceSource.incrFailedRecoveryQueue(); 331 } 332 333 @Override 334 public void init() { 335 singleSourceSource.init(); 336 globalSourceSource.init(); 337 } 338 339 @Override 340 public void setGauge(String gaugeName, long value) { 341 singleSourceSource.setGauge(gaugeName, value); 342 globalSourceSource.setGauge(gaugeName, value); 343 } 344 345 @Override 346 public void incGauge(String gaugeName, long delta) { 347 singleSourceSource.incGauge(gaugeName, delta); 348 globalSourceSource.incGauge(gaugeName, delta); 349 } 350 351 @Override 352 public void decGauge(String gaugeName, long delta) { 353 singleSourceSource.decGauge(gaugeName, delta); 354 globalSourceSource.decGauge(gaugeName, delta); 355 } 356 357 @Override 358 public void removeMetric(String key) { 359 singleSourceSource.removeMetric(key); 360 globalSourceSource.removeMetric(key); 361 } 362 363 @Override 364 public void incCounters(String counterName, long delta) { 365 singleSourceSource.incCounters(counterName, delta); 366 globalSourceSource.incCounters(counterName, delta); 367 } 368 369 @Override 370 public void updateHistogram(String name, long value) { 371 singleSourceSource.updateHistogram(name, value); 372 globalSourceSource.updateHistogram(name, value); 373 } 374 375 @Override 376 public String getMetricsContext() { 377 return globalSourceSource.getMetricsContext(); 378 } 379 380 @Override 381 public String getMetricsDescription() { 382 return globalSourceSource.getMetricsDescription(); 383 } 384 385 @Override 386 public String getMetricsJmxContext() { 387 return globalSourceSource.getMetricsJmxContext(); 388 } 389 390 @Override 391 public String getMetricsName() { 392 return globalSourceSource.getMetricsName(); 393 } 394 395 @VisibleForTesting 396 public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() { 397 return singleSourceSourceByTable; 398 } 399}