001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 024import org.apache.hadoop.hbase.HBaseInterfaceAudience; 025import org.apache.hadoop.hbase.metrics.BaseSource; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.wal.WAL.Entry; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This class is for maintaining the various replication statistics for a source and publishing them 035 * through the metrics interfaces. 036 */ 037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 038public class MetricsSource implements BaseSource { 039 040 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 041 042 // tracks last shipped timestamp for each wal group 043 private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>(); 044 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 045 private long lastHFileRefsQueueSize = 0; 046 private String id; 047 private long timeStampNextToReplicate; 048 049 private final MetricsReplicationSourceSource singleSourceSource; 050 private final MetricsReplicationGlobalSourceSource globalSourceSource; 051 private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; 052 053 /** 054 * Constructor used to register the metrics 055 * @param id Name of the source this class is monitoring 056 */ 057 public MetricsSource(String id) { 058 this.id = id; 059 singleSourceSource = CompatibilitySingletonFactory 060 .getInstance(MetricsReplicationSourceFactory.class).getSource(id); 061 globalSourceSource = CompatibilitySingletonFactory 062 .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 063 singleSourceSourceByTable = new HashMap<>(); 064 } 065 066 /** 067 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 068 * @param id Name of the source this class is monitoring 069 * @param singleSourceSource Class to monitor id-scoped metrics 070 * @param globalSourceSource Class to monitor global-scoped metrics 071 */ 072 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 073 MetricsReplicationGlobalSourceSource globalSourceSource, 074 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { 075 this.id = id; 076 this.singleSourceSource = singleSourceSource; 077 this.globalSourceSource = globalSourceSource; 078 this.singleSourceSourceByTable = singleSourceSourceByTable; 079 } 080 081 /** 082 * Set the age of the last edit that was shipped 083 * @param timestamp target write time of the edit 084 * @param walGroup which group we are setting 085 */ 086 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 087 long age = EnvironmentEdgeManager.currentTime() - timestamp; 088 singleSourceSource.setLastShippedAge(age); 089 globalSourceSource.setLastShippedAge(age); 090 this.ageOfLastShippedOp.put(walGroup, age); 091 this.lastShippedTimeStamps.put(walGroup, timestamp); 092 } 093 094 /** 095 * Update the table level replication metrics per table 096 * @param walEntries List of pairs of WAL entry and it's size 097 */ 098 public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) { 099 for (Pair<Entry, Long> walEntryWithSize : walEntries) { 100 Entry entry = walEntryWithSize.getFirst(); 101 long entrySize = walEntryWithSize.getSecond(); 102 String tableName = entry.getKey().getTableName().getNameAsString(); 103 long writeTime = entry.getKey().getWriteTime(); 104 long age = EnvironmentEdgeManager.currentTime() - writeTime; 105 106 // get the replication metrics source for table at the run time 107 MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() 108 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 109 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)); 110 tableSource.setLastShippedAge(age); 111 tableSource.incrShippedBytes(entrySize); 112 } 113 } 114 115 /** 116 * Set the age of the last edit that was shipped group by table 117 * @param timestamp write time of the edit 118 * @param tableName String as group and tableName 119 */ 120 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 121 long age = EnvironmentEdgeManager.currentTime() - timestamp; 122 this.getSingleSourceSourceByTable() 123 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 124 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) 125 .setLastShippedAge(age); 126 } 127 128 /** 129 * get age of last shipped op of given wal group. If the walGroup is null, return 0 130 * @param walGroup which group we are getting n 131 */ 132 public long getAgeOfLastShippedOp(String walGroup) { 133 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 134 } 135 136 /** 137 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 138 * when replication fails and need to keep that metric accurate. 139 * @param walGroupId id of the group to update 140 */ 141 public void refreshAgeOfLastShippedOp(String walGroupId) { 142 Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); 143 if (lastTimestamp == null) { 144 this.lastShippedTimeStamps.put(walGroupId, 0L); 145 lastTimestamp = 0L; 146 } 147 if (lastTimestamp > 0) { 148 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 149 } 150 } 151 152 /** 153 * Increment size of the log queue. 154 */ 155 public void incrSizeOfLogQueue() { 156 singleSourceSource.incrSizeOfLogQueue(1); 157 globalSourceSource.incrSizeOfLogQueue(1); 158 } 159 160 public void decrSizeOfLogQueue() { 161 singleSourceSource.decrSizeOfLogQueue(1); 162 globalSourceSource.decrSizeOfLogQueue(1); 163 } 164 165 /** 166 * Increment the count for initializing sources 167 */ 168 public void incrSourceInitializing() { 169 singleSourceSource.incrSourceInitializing(); 170 globalSourceSource.incrSourceInitializing(); 171 } 172 173 /** 174 * Decrement the count for initializing sources 175 */ 176 public void decrSourceInitializing() { 177 singleSourceSource.decrSourceInitializing(); 178 globalSourceSource.decrSourceInitializing(); 179 } 180 181 /** 182 * Add on the the number of log edits read 183 * @param delta the number of log edits read. 184 */ 185 private void incrLogEditsRead(long delta) { 186 singleSourceSource.incrLogReadInEdits(delta); 187 globalSourceSource.incrLogReadInEdits(delta); 188 } 189 190 /** Increment the number of log edits read by one. */ 191 public void incrLogEditsRead() { 192 incrLogEditsRead(1); 193 } 194 195 /** 196 * Add on the number of log edits filtered 197 * @param delta the number filtered. 198 */ 199 public void incrLogEditsFiltered(long delta) { 200 singleSourceSource.incrLogEditsFiltered(delta); 201 globalSourceSource.incrLogEditsFiltered(delta); 202 } 203 204 /** The number of log edits filtered out. */ 205 public void incrLogEditsFiltered() { 206 incrLogEditsFiltered(1); 207 } 208 209 /** 210 * Convience method to apply changes to metrics do to shipping a batch of logs. 211 * @param batchSize the size of the batch that was shipped to sinks. 212 */ 213 public void shipBatch(long batchSize, int sizeInBytes) { 214 singleSourceSource.incrBatchesShipped(1); 215 globalSourceSource.incrBatchesShipped(1); 216 217 singleSourceSource.incrOpsShipped(batchSize); 218 globalSourceSource.incrOpsShipped(batchSize); 219 220 singleSourceSource.incrShippedBytes(sizeInBytes); 221 globalSourceSource.incrShippedBytes(sizeInBytes); 222 } 223 224 /** 225 * Convenience method to update metrics when batch of operations has failed. 226 */ 227 public void incrementFailedBatches() { 228 singleSourceSource.incrFailedBatches(); 229 globalSourceSource.incrFailedBatches(); 230 } 231 232 /** 233 * Gets the number of edits not eligible for replication this source queue logs so far. 234 * @return logEditsFiltered non-replicable edits filtered from this queue logs. 235 */ 236 public long getEditsFiltered() { 237 return this.singleSourceSource.getEditsFiltered(); 238 } 239 240 /** 241 * Gets the number of edits eligible for replication read from this source queue logs so far. 242 * @return replicableEdits total number of replicable edits read from this queue logs. 243 */ 244 public long getReplicableEdits() { 245 return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); 246 } 247 248 /** 249 * Gets the number of OPs shipped by this source queue to target cluster. 250 * @return oPsShipped total number of OPs shipped by this source. 251 */ 252 public long getOpsShipped() { 253 return this.singleSourceSource.getShippedOps(); 254 } 255 256 /** 257 * Convience method to apply changes to metrics do to shipping a batch of logs. 258 * @param batchSize the size of the batch that was shipped to sinks. 259 * @param hfiles total number of hfiles shipped to sinks. 260 */ 261 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 262 shipBatch(batchSize, sizeInBytes); 263 singleSourceSource.incrHFilesShipped(hfiles); 264 globalSourceSource.incrHFilesShipped(hfiles); 265 } 266 267 /** increase the byte number read by source from log file */ 268 public void incrLogReadInBytes(long readInBytes) { 269 singleSourceSource.incrLogReadInBytes(readInBytes); 270 globalSourceSource.incrLogReadInBytes(readInBytes); 271 } 272 273 /** Removes all metrics about this Source. */ 274 public void clear() { 275 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 276 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 277 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 278 singleSourceSource.clear(); 279 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 280 lastShippedTimeStamps.clear(); 281 lastHFileRefsQueueSize = 0; 282 timeStampNextToReplicate = 0; 283 } 284 285 /** 286 * Get AgeOfLastShippedOp n 287 */ 288 public Long getAgeOfLastShippedOp() { 289 return singleSourceSource.getLastShippedAge(); 290 } 291 292 /** 293 * Get the sizeOfLogQueue n 294 */ 295 public int getSizeOfLogQueue() { 296 return singleSourceSource.getSizeOfLogQueue(); 297 } 298 299 /** 300 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one n 301 * * @deprecated Since 2.0.0. Removed in 3.0.0. 302 * @see #getTimestampOfLastShippedOp() 303 */ 304 @Deprecated 305 public long getTimeStampOfLastShippedOp() { 306 return getTimestampOfLastShippedOp(); 307 } 308 309 /** 310 * Get the value of uncleanlyClosedWAL counter n 311 */ 312 public long getUncleanlyClosedWALs() { 313 return singleSourceSource.getUncleanlyClosedWALs(); 314 } 315 316 /** 317 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one n 318 */ 319 public long getTimestampOfLastShippedOp() { 320 long lastTimestamp = 0L; 321 for (long ts : lastShippedTimeStamps.values()) { 322 if (ts > lastTimestamp) { 323 lastTimestamp = ts; 324 } 325 } 326 return lastTimestamp; 327 } 328 329 /** 330 * TimeStamp of next edit to be replicated. 331 * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. 332 */ 333 public long getTimeStampNextToReplicate() { 334 return timeStampNextToReplicate; 335 } 336 337 /** 338 * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp 339 * is greater than timestamp of last shipped, it means there's at least one edit pending 340 * replication. 341 * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. 342 */ 343 public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { 344 this.timeStampNextToReplicate = timeStampNextToReplicate; 345 } 346 347 public long getReplicationDelay() { 348 if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) { 349 return 0; 350 } else { 351 return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; 352 } 353 } 354 355 /** 356 * Get the source initializing counts 357 * @return number of replication sources getting initialized 358 */ 359 public int getSourceInitializing() { 360 return singleSourceSource.getSourceInitializing(); 361 } 362 363 /** 364 * Get the slave peer ID n 365 */ 366 public String getPeerID() { 367 return id; 368 } 369 370 public void incrSizeOfHFileRefsQueue(long size) { 371 singleSourceSource.incrSizeOfHFileRefsQueue(size); 372 globalSourceSource.incrSizeOfHFileRefsQueue(size); 373 lastHFileRefsQueueSize = size; 374 } 375 376 public void decrSizeOfHFileRefsQueue(int size) { 377 singleSourceSource.decrSizeOfHFileRefsQueue(size); 378 globalSourceSource.decrSizeOfHFileRefsQueue(size); 379 lastHFileRefsQueueSize -= size; 380 if (lastHFileRefsQueueSize < 0) { 381 lastHFileRefsQueueSize = 0; 382 } 383 } 384 385 public void incrUnknownFileLengthForClosedWAL() { 386 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 387 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 388 } 389 390 public void incrUncleanlyClosedWALs() { 391 singleSourceSource.incrUncleanlyClosedWALs(); 392 globalSourceSource.incrUncleanlyClosedWALs(); 393 } 394 395 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 396 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 397 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 398 } 399 400 public void incrRestartedWALReading() { 401 singleSourceSource.incrRestartedWALReading(); 402 globalSourceSource.incrRestartedWALReading(); 403 } 404 405 public void incrRepeatedFileBytes(final long bytes) { 406 singleSourceSource.incrRepeatedFileBytes(bytes); 407 globalSourceSource.incrRepeatedFileBytes(bytes); 408 } 409 410 public void incrCompletedWAL() { 411 singleSourceSource.incrCompletedWAL(); 412 globalSourceSource.incrCompletedWAL(); 413 } 414 415 public void incrCompletedRecoveryQueue() { 416 singleSourceSource.incrCompletedRecoveryQueue(); 417 globalSourceSource.incrCompletedRecoveryQueue(); 418 } 419 420 public void incrFailedRecoveryQueue() { 421 globalSourceSource.incrFailedRecoveryQueue(); 422 } 423 424 /* 425 * Sets the age of oldest log file just for source. 426 */ 427 public void setOldestWalAge(long age) { 428 singleSourceSource.setOldestWalAge(age); 429 } 430 431 public long getOldestWalAge() { 432 return singleSourceSource.getOldestWalAge(); 433 } 434 435 @Override 436 public void init() { 437 singleSourceSource.init(); 438 globalSourceSource.init(); 439 } 440 441 @Override 442 public void setGauge(String gaugeName, long value) { 443 singleSourceSource.setGauge(gaugeName, value); 444 globalSourceSource.setGauge(gaugeName, value); 445 } 446 447 @Override 448 public void incGauge(String gaugeName, long delta) { 449 singleSourceSource.incGauge(gaugeName, delta); 450 globalSourceSource.incGauge(gaugeName, delta); 451 } 452 453 @Override 454 public void decGauge(String gaugeName, long delta) { 455 singleSourceSource.decGauge(gaugeName, delta); 456 globalSourceSource.decGauge(gaugeName, delta); 457 } 458 459 @Override 460 public void removeMetric(String key) { 461 singleSourceSource.removeMetric(key); 462 globalSourceSource.removeMetric(key); 463 } 464 465 @Override 466 public void incCounters(String counterName, long delta) { 467 singleSourceSource.incCounters(counterName, delta); 468 globalSourceSource.incCounters(counterName, delta); 469 } 470 471 @Override 472 public void updateHistogram(String name, long value) { 473 singleSourceSource.updateHistogram(name, value); 474 globalSourceSource.updateHistogram(name, value); 475 } 476 477 @Override 478 public String getMetricsContext() { 479 return globalSourceSource.getMetricsContext(); 480 } 481 482 @Override 483 public String getMetricsDescription() { 484 return globalSourceSource.getMetricsDescription(); 485 } 486 487 @Override 488 public String getMetricsJmxContext() { 489 return globalSourceSource.getMetricsJmxContext(); 490 } 491 492 @Override 493 public String getMetricsName() { 494 return globalSourceSource.getMetricsName(); 495 } 496 497 @InterfaceAudience.Private 498 public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { 499 return singleSourceSourceByTable; 500 } 501 502 /** 503 * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. 504 */ 505 public void setWALReaderEditsBufferUsage(long usageInBytes) { 506 globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); 507 } 508 509 /** 510 * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. n 511 */ 512 public long getWALReaderEditsBufferUsage() { 513 return globalSourceSource.getWALReaderEditsBufferBytes(); 514 } 515}