001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.metrics.BaseSource; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033 034/** 035 * This class is for maintaining the various replication statistics for a source and publishing them 036 * through the metrics interfaces. 037 */ 038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 039public class MetricsSource implements BaseSource { 040 041 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 042 043 // tracks last shipped timestamp for each wal group 044 private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>(); 045 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 046 private long lastHFileRefsQueueSize = 0; 047 private String id; 048 private long timeStampNextToReplicate; 049 050 private final MetricsReplicationSourceSource singleSourceSource; 051 private final MetricsReplicationSourceSource globalSourceSource; 052 private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable; 053 054 /** 055 * Constructor used to register the metrics 056 * 057 * @param id Name of the source this class is monitoring 058 */ 059 public MetricsSource(String id) { 060 this.id = id; 061 singleSourceSource = 062 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 063 .getSource(id); 064 globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 065 singleSourceSourceByTable = new HashMap<>(); 066 } 067 068 /** 069 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 070 * @param id Name of the source this class is monitoring 071 * @param singleSourceSource Class to monitor id-scoped metrics 072 * @param globalSourceSource Class to monitor global-scoped metrics 073 */ 074 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 075 MetricsReplicationSourceSource globalSourceSource, 076 Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) { 077 this.id = id; 078 this.singleSourceSource = singleSourceSource; 079 this.globalSourceSource = globalSourceSource; 080 this.singleSourceSourceByTable = singleSourceSourceByTable; 081 } 082 083 /** 084 * Set the age of the last edit that was shipped 085 * @param timestamp target write time of the edit 086 * @param walGroup which group we are setting 087 */ 088 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 089 long age = EnvironmentEdgeManager.currentTime() - timestamp; 090 singleSourceSource.setLastShippedAge(age); 091 globalSourceSource.setLastShippedAge(age); 092 this.ageOfLastShippedOp.put(walGroup, age); 093 this.lastShippedTimeStamps.put(walGroup, timestamp); 094 } 095 096 /** 097 * Set the age of the last edit that was shipped group by table 098 * @param timestamp write time of the edit 099 * @param tableName String as group and tableName 100 */ 101 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 102 long age = EnvironmentEdgeManager.currentTime() - timestamp; 103 this.getSingleSourceSourceByTable().computeIfAbsent( 104 tableName, t -> CompatibilitySingletonFactory 105 .getInstance(MetricsReplicationSourceFactory.class).getSource(t)) 106 .setLastShippedAge(age); 107 } 108 109 /** 110 * get age of last shipped op of given wal group. If the walGroup is null, return 0 111 * @param walGroup which group we are getting 112 * @return age 113 */ 114 public long getAgeofLastShippedOp(String walGroup) { 115 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 116 } 117 118 /** 119 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 120 * when replication fails and need to keep that metric accurate. 121 * @param walGroupId id of the group to update 122 */ 123 public void refreshAgeOfLastShippedOp(String walGroupId) { 124 Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); 125 if (lastTimestamp == null) { 126 this.lastShippedTimeStamps.put(walGroupId, 0L); 127 lastTimestamp = 0L; 128 } 129 if (lastTimestamp > 0) { 130 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 131 } 132 } 133 134 /** 135 * Increment size of the log queue. 136 */ 137 public void incrSizeOfLogQueue() { 138 singleSourceSource.incrSizeOfLogQueue(1); 139 globalSourceSource.incrSizeOfLogQueue(1); 140 } 141 142 public void decrSizeOfLogQueue() { 143 singleSourceSource.decrSizeOfLogQueue(1); 144 globalSourceSource.decrSizeOfLogQueue(1); 145 } 146 147 /** 148 * Add on the the number of log edits read 149 * 150 * @param delta the number of log edits read. 151 */ 152 private void incrLogEditsRead(long delta) { 153 singleSourceSource.incrLogReadInEdits(delta); 154 globalSourceSource.incrLogReadInEdits(delta); 155 } 156 157 /** Increment the number of log edits read by one. */ 158 public void incrLogEditsRead() { 159 incrLogEditsRead(1); 160 } 161 162 /** 163 * Add on the number of log edits filtered 164 * 165 * @param delta the number filtered. 166 */ 167 public void incrLogEditsFiltered(long delta) { 168 singleSourceSource.incrLogEditsFiltered(delta); 169 globalSourceSource.incrLogEditsFiltered(delta); 170 } 171 172 /** The number of log edits filtered out. */ 173 public void incrLogEditsFiltered() { 174 incrLogEditsFiltered(1); 175 } 176 177 /** 178 * Convience method to apply changes to metrics do to shipping a batch of logs. 179 * 180 * @param batchSize the size of the batch that was shipped to sinks. 181 */ 182 public void shipBatch(long batchSize, int sizeInBytes) { 183 singleSourceSource.incrBatchesShipped(1); 184 globalSourceSource.incrBatchesShipped(1); 185 186 singleSourceSource.incrOpsShipped(batchSize); 187 globalSourceSource.incrOpsShipped(batchSize); 188 189 singleSourceSource.incrShippedBytes(sizeInBytes); 190 globalSourceSource.incrShippedBytes(sizeInBytes); 191 } 192 193 /** 194 * Gets the number of edits not eligible for replication this source queue logs so far. 195 * @return logEditsFiltered non-replicable edits filtered from this queue logs. 196 */ 197 public long getEditsFiltered(){ 198 return this.singleSourceSource.getEditsFiltered(); 199 } 200 201 /** 202 * Gets the number of edits eligible for replication read from this source queue logs so far. 203 * @return replicableEdits total number of replicable edits read from this queue logs. 204 */ 205 public long getReplicableEdits(){ 206 return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); 207 } 208 209 /** 210 * Gets the number of OPs shipped by this source queue to target cluster. 211 * @return oPsShipped total number of OPs shipped by this source. 212 */ 213 public long getOpsShipped() { 214 return this.singleSourceSource.getShippedOps(); 215 } 216 217 /** 218 * Convience method to apply changes to metrics do to shipping a batch of logs. 219 * 220 * @param batchSize the size of the batch that was shipped to sinks. 221 * @param hfiles total number of hfiles shipped to sinks. 222 */ 223 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 224 shipBatch(batchSize, sizeInBytes); 225 singleSourceSource.incrHFilesShipped(hfiles); 226 globalSourceSource.incrHFilesShipped(hfiles); 227 } 228 229 /** increase the byte number read by source from log file */ 230 public void incrLogReadInBytes(long readInBytes) { 231 singleSourceSource.incrLogReadInBytes(readInBytes); 232 globalSourceSource.incrLogReadInBytes(readInBytes); 233 } 234 235 /** Removes all metrics about this Source. */ 236 public void clear() { 237 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 238 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 239 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 240 singleSourceSource.clear(); 241 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 242 lastShippedTimeStamps.clear(); 243 lastHFileRefsQueueSize = 0; 244 timeStampNextToReplicate = 0; 245 } 246 247 /** 248 * Get AgeOfLastShippedOp 249 * @return AgeOfLastShippedOp 250 */ 251 public Long getAgeOfLastShippedOp() { 252 return singleSourceSource.getLastShippedAge(); 253 } 254 255 /** 256 * Get the sizeOfLogQueue 257 * @return sizeOfLogQueue 258 */ 259 public int getSizeOfLogQueue() { 260 return singleSourceSource.getSizeOfLogQueue(); 261 } 262 263 /** 264 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one 265 * @return lastTimestampForAge 266 * @deprecated Since 2.0.0. Removed in 3.0.0. 267 * @see #getTimestampOfLastShippedOp() 268 */ 269 @Deprecated 270 public long getTimeStampOfLastShippedOp() { 271 return getTimestampOfLastShippedOp(); 272 } 273 274 /** 275 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 276 * @return lastTimestampForAge 277 */ 278 public long getTimestampOfLastShippedOp() { 279 long lastTimestamp = 0L; 280 for (long ts : lastShippedTimeStamps.values()) { 281 if (ts > lastTimestamp) { 282 lastTimestamp = ts; 283 } 284 } 285 return lastTimestamp; 286 } 287 288 /** 289 * TimeStamp of next edit to be replicated. 290 * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. 291 */ 292 public long getTimeStampNextToReplicate() { 293 return timeStampNextToReplicate; 294 } 295 296 /** 297 * TimeStamp of next edit targeted for replication. Used for calculating lag, 298 * as if this timestamp is greater than timestamp of last shipped, it means there's 299 * at least one edit pending replication. 300 * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. 301 */ 302 public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { 303 this.timeStampNextToReplicate = timeStampNextToReplicate; 304 } 305 306 public long getReplicationDelay() { 307 if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){ 308 return 0; 309 }else{ 310 return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; 311 } 312 } 313 314 /** 315 * Get the slave peer ID 316 * @return peerID 317 */ 318 public String getPeerID() { 319 return id; 320 } 321 322 public void incrSizeOfHFileRefsQueue(long size) { 323 singleSourceSource.incrSizeOfHFileRefsQueue(size); 324 globalSourceSource.incrSizeOfHFileRefsQueue(size); 325 lastHFileRefsQueueSize = size; 326 } 327 328 public void decrSizeOfHFileRefsQueue(int size) { 329 singleSourceSource.decrSizeOfHFileRefsQueue(size); 330 globalSourceSource.decrSizeOfHFileRefsQueue(size); 331 lastHFileRefsQueueSize -= size; 332 if (lastHFileRefsQueueSize < 0) { 333 lastHFileRefsQueueSize = 0; 334 } 335 } 336 337 public void incrUnknownFileLengthForClosedWAL() { 338 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 339 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 340 } 341 342 public void incrUncleanlyClosedWALs() { 343 singleSourceSource.incrUncleanlyClosedWALs(); 344 globalSourceSource.incrUncleanlyClosedWALs(); 345 } 346 347 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 348 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 349 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 350 } 351 352 public void incrRestartedWALReading() { 353 singleSourceSource.incrRestartedWALReading(); 354 globalSourceSource.incrRestartedWALReading(); 355 } 356 357 public void incrRepeatedFileBytes(final long bytes) { 358 singleSourceSource.incrRepeatedFileBytes(bytes); 359 globalSourceSource.incrRepeatedFileBytes(bytes); 360 } 361 362 public void incrCompletedWAL() { 363 singleSourceSource.incrCompletedWAL(); 364 globalSourceSource.incrCompletedWAL(); 365 } 366 367 public void incrCompletedRecoveryQueue() { 368 singleSourceSource.incrCompletedRecoveryQueue(); 369 globalSourceSource.incrCompletedRecoveryQueue(); 370 } 371 372 public void incrFailedRecoveryQueue() { 373 globalSourceSource.incrFailedRecoveryQueue(); 374 } 375 376 @Override 377 public void init() { 378 singleSourceSource.init(); 379 globalSourceSource.init(); 380 } 381 382 @Override 383 public void setGauge(String gaugeName, long value) { 384 singleSourceSource.setGauge(gaugeName, value); 385 globalSourceSource.setGauge(gaugeName, value); 386 } 387 388 @Override 389 public void incGauge(String gaugeName, long delta) { 390 singleSourceSource.incGauge(gaugeName, delta); 391 globalSourceSource.incGauge(gaugeName, delta); 392 } 393 394 @Override 395 public void decGauge(String gaugeName, long delta) { 396 singleSourceSource.decGauge(gaugeName, delta); 397 globalSourceSource.decGauge(gaugeName, delta); 398 } 399 400 @Override 401 public void removeMetric(String key) { 402 singleSourceSource.removeMetric(key); 403 globalSourceSource.removeMetric(key); 404 } 405 406 @Override 407 public void incCounters(String counterName, long delta) { 408 singleSourceSource.incCounters(counterName, delta); 409 globalSourceSource.incCounters(counterName, delta); 410 } 411 412 @Override 413 public void updateHistogram(String name, long value) { 414 singleSourceSource.updateHistogram(name, value); 415 globalSourceSource.updateHistogram(name, value); 416 } 417 418 @Override 419 public String getMetricsContext() { 420 return globalSourceSource.getMetricsContext(); 421 } 422 423 @Override 424 public String getMetricsDescription() { 425 return globalSourceSource.getMetricsDescription(); 426 } 427 428 @Override 429 public String getMetricsJmxContext() { 430 return globalSourceSource.getMetricsJmxContext(); 431 } 432 433 @Override 434 public String getMetricsName() { 435 return globalSourceSource.getMetricsName(); 436 } 437 438 @VisibleForTesting 439 public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() { 440 return singleSourceSourceByTable; 441 } 442}