001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.hadoop.hbase.util.Pair; 026import org.apache.hadoop.hbase.wal.WAL.Entry; 027import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.metrics.BaseSource; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * This class is for maintaining the various replication statistics for a source and publishing them 037 * through the metrics interfaces. 038 */ 039@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 040public class MetricsSource implements BaseSource { 041 042 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 043 044 // tracks last shipped timestamp for each wal group 045 private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>(); 046 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 047 private long lastHFileRefsQueueSize = 0; 048 private String id; 049 private long timeStampNextToReplicate; 050 051 private final MetricsReplicationSourceSource singleSourceSource; 052 private final MetricsReplicationGlobalSourceSource globalSourceSource; 053 private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; 054 055 /** 056 * Constructor used to register the metrics 057 * 058 * @param id Name of the source this class is monitoring 059 */ 060 public MetricsSource(String id) { 061 this.id = id; 062 singleSourceSource = 063 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 064 .getSource(id); 065 globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 066 singleSourceSourceByTable = new HashMap<>(); 067 } 068 069 /** 070 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 071 * @param id Name of the source this class is monitoring 072 * @param singleSourceSource Class to monitor id-scoped metrics 073 * @param globalSourceSource Class to monitor global-scoped metrics 074 */ 075 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 076 MetricsReplicationGlobalSourceSource globalSourceSource, 077 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { 078 this.id = id; 079 this.singleSourceSource = singleSourceSource; 080 this.globalSourceSource = globalSourceSource; 081 this.singleSourceSourceByTable = singleSourceSourceByTable; 082 } 083 084 /** 085 * Set the age of the last edit that was shipped 086 * @param timestamp target write time of the edit 087 * @param walGroup which group we are setting 088 */ 089 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 090 long age = EnvironmentEdgeManager.currentTime() - timestamp; 091 singleSourceSource.setLastShippedAge(age); 092 globalSourceSource.setLastShippedAge(age); 093 this.ageOfLastShippedOp.put(walGroup, age); 094 this.lastShippedTimeStamps.put(walGroup, timestamp); 095 } 096 097 /** 098 * Update the table level replication metrics per table 099 * 100 * @param walEntries List of pairs of WAL entry and it's size 101 */ 102 public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) { 103 for (Pair<Entry, Long> walEntryWithSize : walEntries) { 104 Entry entry = walEntryWithSize.getFirst(); 105 long entrySize = walEntryWithSize.getSecond(); 106 String tableName = entry.getKey().getTableName().getNameAsString(); 107 long writeTime = entry.getKey().getWriteTime(); 108 long age = EnvironmentEdgeManager.currentTime() - writeTime; 109 110 // get the replication metrics source for table at the run time 111 MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() 112 .computeIfAbsent(tableName, 113 t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 114 .getTableSource(t)); 115 tableSource.setLastShippedAge(age); 116 tableSource.incrShippedBytes(entrySize); 117 } 118 } 119 120 /** 121 * Set the age of the last edit that was shipped group by table 122 * @param timestamp write time of the edit 123 * @param tableName String as group and tableName 124 */ 125 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 126 long age = EnvironmentEdgeManager.currentTime() - timestamp; 127 this.getSingleSourceSourceByTable().computeIfAbsent( 128 tableName, t -> CompatibilitySingletonFactory 129 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) 130 .setLastShippedAge(age); 131 } 132 133 /** 134 * get age of last shipped op of given wal group. If the walGroup is null, return 0 135 * @param walGroup which group we are getting 136 * @return age 137 */ 138 public long getAgeOfLastShippedOp(String walGroup) { 139 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 140 } 141 142 /** 143 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 144 * when replication fails and need to keep that metric accurate. 145 * @param walGroupId id of the group to update 146 */ 147 public void refreshAgeOfLastShippedOp(String walGroupId) { 148 Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); 149 if (lastTimestamp == null) { 150 this.lastShippedTimeStamps.put(walGroupId, 0L); 151 lastTimestamp = 0L; 152 } 153 if (lastTimestamp > 0) { 154 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 155 } 156 } 157 158 /** 159 * Increment size of the log queue. 160 */ 161 public void incrSizeOfLogQueue() { 162 singleSourceSource.incrSizeOfLogQueue(1); 163 globalSourceSource.incrSizeOfLogQueue(1); 164 } 165 166 public void decrSizeOfLogQueue() { 167 singleSourceSource.decrSizeOfLogQueue(1); 168 globalSourceSource.decrSizeOfLogQueue(1); 169 } 170 171 /** 172 * Add on the the number of log edits read 173 * 174 * @param delta the number of log edits read. 175 */ 176 private void incrLogEditsRead(long delta) { 177 singleSourceSource.incrLogReadInEdits(delta); 178 globalSourceSource.incrLogReadInEdits(delta); 179 } 180 181 /** Increment the number of log edits read by one. */ 182 public void incrLogEditsRead() { 183 incrLogEditsRead(1); 184 } 185 186 /** 187 * Add on the number of log edits filtered 188 * 189 * @param delta the number filtered. 190 */ 191 public void incrLogEditsFiltered(long delta) { 192 singleSourceSource.incrLogEditsFiltered(delta); 193 globalSourceSource.incrLogEditsFiltered(delta); 194 } 195 196 /** The number of log edits filtered out. */ 197 public void incrLogEditsFiltered() { 198 incrLogEditsFiltered(1); 199 } 200 201 /** 202 * Convience method to apply changes to metrics do to shipping a batch of logs. 203 * 204 * @param batchSize the size of the batch that was shipped to sinks. 205 */ 206 public void shipBatch(long batchSize, int sizeInBytes) { 207 singleSourceSource.incrBatchesShipped(1); 208 globalSourceSource.incrBatchesShipped(1); 209 210 singleSourceSource.incrOpsShipped(batchSize); 211 globalSourceSource.incrOpsShipped(batchSize); 212 213 singleSourceSource.incrShippedBytes(sizeInBytes); 214 globalSourceSource.incrShippedBytes(sizeInBytes); 215 } 216 217 /** 218 * Gets the number of edits not eligible for replication this source queue logs so far. 219 * @return logEditsFiltered non-replicable edits filtered from this queue logs. 220 */ 221 public long getEditsFiltered(){ 222 return this.singleSourceSource.getEditsFiltered(); 223 } 224 225 /** 226 * Gets the number of edits eligible for replication read from this source queue logs so far. 227 * @return replicableEdits total number of replicable edits read from this queue logs. 228 */ 229 public long getReplicableEdits(){ 230 return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); 231 } 232 233 /** 234 * Gets the number of OPs shipped by this source queue to target cluster. 235 * @return oPsShipped total number of OPs shipped by this source. 236 */ 237 public long getOpsShipped() { 238 return this.singleSourceSource.getShippedOps(); 239 } 240 241 /** 242 * Convience method to apply changes to metrics do to shipping a batch of logs. 243 * 244 * @param batchSize the size of the batch that was shipped to sinks. 245 * @param hfiles total number of hfiles shipped to sinks. 246 */ 247 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 248 shipBatch(batchSize, sizeInBytes); 249 singleSourceSource.incrHFilesShipped(hfiles); 250 globalSourceSource.incrHFilesShipped(hfiles); 251 } 252 253 /** increase the byte number read by source from log file */ 254 public void incrLogReadInBytes(long readInBytes) { 255 singleSourceSource.incrLogReadInBytes(readInBytes); 256 globalSourceSource.incrLogReadInBytes(readInBytes); 257 } 258 259 /** Removes all metrics about this Source. */ 260 public void clear() { 261 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 262 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 263 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 264 singleSourceSource.clear(); 265 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 266 lastShippedTimeStamps.clear(); 267 lastHFileRefsQueueSize = 0; 268 timeStampNextToReplicate = 0; 269 } 270 271 /** 272 * Get AgeOfLastShippedOp 273 * @return AgeOfLastShippedOp 274 */ 275 public Long getAgeOfLastShippedOp() { 276 return singleSourceSource.getLastShippedAge(); 277 } 278 279 /** 280 * Get the sizeOfLogQueue 281 * @return sizeOfLogQueue 282 */ 283 public int getSizeOfLogQueue() { 284 return singleSourceSource.getSizeOfLogQueue(); 285 } 286 287 /** 288 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one 289 * @return lastTimestampForAge 290 * @deprecated Since 2.0.0. Removed in 3.0.0. 291 * @see #getTimestampOfLastShippedOp() 292 */ 293 @Deprecated 294 public long getTimeStampOfLastShippedOp() { 295 return getTimestampOfLastShippedOp(); 296 } 297 298 /** 299 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 300 * @return lastTimestampForAge 301 */ 302 public long getTimestampOfLastShippedOp() { 303 long lastTimestamp = 0L; 304 for (long ts : lastShippedTimeStamps.values()) { 305 if (ts > lastTimestamp) { 306 lastTimestamp = ts; 307 } 308 } 309 return lastTimestamp; 310 } 311 312 /** 313 * TimeStamp of next edit to be replicated. 314 * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. 315 */ 316 public long getTimeStampNextToReplicate() { 317 return timeStampNextToReplicate; 318 } 319 320 /** 321 * TimeStamp of next edit targeted for replication. Used for calculating lag, 322 * as if this timestamp is greater than timestamp of last shipped, it means there's 323 * at least one edit pending replication. 324 * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. 325 */ 326 public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { 327 this.timeStampNextToReplicate = timeStampNextToReplicate; 328 } 329 330 public long getReplicationDelay() { 331 if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){ 332 return 0; 333 }else{ 334 return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; 335 } 336 } 337 338 /** 339 * Get the slave peer ID 340 * @return peerID 341 */ 342 public String getPeerID() { 343 return id; 344 } 345 346 public void incrSizeOfHFileRefsQueue(long size) { 347 singleSourceSource.incrSizeOfHFileRefsQueue(size); 348 globalSourceSource.incrSizeOfHFileRefsQueue(size); 349 lastHFileRefsQueueSize = size; 350 } 351 352 public void decrSizeOfHFileRefsQueue(int size) { 353 singleSourceSource.decrSizeOfHFileRefsQueue(size); 354 globalSourceSource.decrSizeOfHFileRefsQueue(size); 355 lastHFileRefsQueueSize -= size; 356 if (lastHFileRefsQueueSize < 0) { 357 lastHFileRefsQueueSize = 0; 358 } 359 } 360 361 public void incrUnknownFileLengthForClosedWAL() { 362 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 363 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 364 } 365 366 public void incrUncleanlyClosedWALs() { 367 singleSourceSource.incrUncleanlyClosedWALs(); 368 globalSourceSource.incrUncleanlyClosedWALs(); 369 } 370 371 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 372 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 373 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 374 } 375 376 public void incrRestartedWALReading() { 377 singleSourceSource.incrRestartedWALReading(); 378 globalSourceSource.incrRestartedWALReading(); 379 } 380 381 public void incrRepeatedFileBytes(final long bytes) { 382 singleSourceSource.incrRepeatedFileBytes(bytes); 383 globalSourceSource.incrRepeatedFileBytes(bytes); 384 } 385 386 public void incrCompletedWAL() { 387 singleSourceSource.incrCompletedWAL(); 388 globalSourceSource.incrCompletedWAL(); 389 } 390 391 public void incrCompletedRecoveryQueue() { 392 singleSourceSource.incrCompletedRecoveryQueue(); 393 globalSourceSource.incrCompletedRecoveryQueue(); 394 } 395 396 public void incrFailedRecoveryQueue() { 397 globalSourceSource.incrFailedRecoveryQueue(); 398 } 399 400 @Override 401 public void init() { 402 singleSourceSource.init(); 403 globalSourceSource.init(); 404 } 405 406 @Override 407 public void setGauge(String gaugeName, long value) { 408 singleSourceSource.setGauge(gaugeName, value); 409 globalSourceSource.setGauge(gaugeName, value); 410 } 411 412 @Override 413 public void incGauge(String gaugeName, long delta) { 414 singleSourceSource.incGauge(gaugeName, delta); 415 globalSourceSource.incGauge(gaugeName, delta); 416 } 417 418 @Override 419 public void decGauge(String gaugeName, long delta) { 420 singleSourceSource.decGauge(gaugeName, delta); 421 globalSourceSource.decGauge(gaugeName, delta); 422 } 423 424 @Override 425 public void removeMetric(String key) { 426 singleSourceSource.removeMetric(key); 427 globalSourceSource.removeMetric(key); 428 } 429 430 @Override 431 public void incCounters(String counterName, long delta) { 432 singleSourceSource.incCounters(counterName, delta); 433 globalSourceSource.incCounters(counterName, delta); 434 } 435 436 @Override 437 public void updateHistogram(String name, long value) { 438 singleSourceSource.updateHistogram(name, value); 439 globalSourceSource.updateHistogram(name, value); 440 } 441 442 @Override 443 public String getMetricsContext() { 444 return globalSourceSource.getMetricsContext(); 445 } 446 447 @Override 448 public String getMetricsDescription() { 449 return globalSourceSource.getMetricsDescription(); 450 } 451 452 @Override 453 public String getMetricsJmxContext() { 454 return globalSourceSource.getMetricsJmxContext(); 455 } 456 457 @Override 458 public String getMetricsName() { 459 return globalSourceSource.getMetricsName(); 460 } 461 462 @InterfaceAudience.Private 463 public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { 464 return singleSourceSourceByTable; 465 } 466 467 /** 468 * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. 469 */ 470 public void setWALReaderEditsBufferUsage(long usageInBytes) { 471 globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); 472 } 473 474 /** 475 * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. 476 * @return 477 */ 478 public long getWALReaderEditsBufferUsage() { 479 return globalSourceSource.getWALReaderEditsBufferBytes(); 480 } 481}