001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 024import org.apache.hadoop.hbase.HBaseInterfaceAudience; 025import org.apache.hadoop.hbase.metrics.BaseSource; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.wal.WAL.Entry; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This class is for maintaining the various replication statistics for a source and publishing them 035 * through the metrics interfaces. 036 */ 037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 038public class MetricsSource implements BaseSource { 039 040 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 041 042 // tracks last shipped timestamp for each wal group 043 private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>(); 044 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 045 private long lastHFileRefsQueueSize = 0; 046 private String id; 047 private long timeStampNextToReplicate; 048 049 private final MetricsReplicationSourceSource singleSourceSource; 050 private final MetricsReplicationGlobalSourceSource globalSourceSource; 051 private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; 052 053 /** 054 * Constructor used to register the metrics 055 * @param id Name of the source this class is monitoring 056 */ 057 public MetricsSource(String id) { 058 this.id = id; 059 singleSourceSource = CompatibilitySingletonFactory 060 .getInstance(MetricsReplicationSourceFactory.class).getSource(id); 061 globalSourceSource = CompatibilitySingletonFactory 062 .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 063 singleSourceSourceByTable = new HashMap<>(); 064 } 065 066 /** 067 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 068 * @param id Name of the source this class is monitoring 069 * @param singleSourceSource Class to monitor id-scoped metrics 070 * @param globalSourceSource Class to monitor global-scoped metrics 071 */ 072 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 073 MetricsReplicationGlobalSourceSource globalSourceSource, 074 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { 075 this.id = id; 076 this.singleSourceSource = singleSourceSource; 077 this.globalSourceSource = globalSourceSource; 078 this.singleSourceSourceByTable = singleSourceSourceByTable; 079 } 080 081 /** 082 * Set the age of the last edit that was shipped 083 * @param timestamp target write time of the edit 084 * @param walGroup which group we are setting 085 */ 086 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 087 long age = EnvironmentEdgeManager.currentTime() - timestamp; 088 singleSourceSource.setLastShippedAge(age); 089 globalSourceSource.setLastShippedAge(age); 090 this.ageOfLastShippedOp.put(walGroup, age); 091 this.lastShippedTimeStamps.put(walGroup, timestamp); 092 } 093 094 /** 095 * Update the table level replication metrics per table 096 * @param walEntries List of pairs of WAL entry and it's size 097 */ 098 public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) { 099 for (Pair<Entry, Long> walEntryWithSize : walEntries) { 100 Entry entry = walEntryWithSize.getFirst(); 101 long entrySize = walEntryWithSize.getSecond(); 102 String tableName = entry.getKey().getTableName().getNameAsString(); 103 long writeTime = entry.getKey().getWriteTime(); 104 long age = EnvironmentEdgeManager.currentTime() - writeTime; 105 106 // get the replication metrics source for table at the run time 107 MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() 108 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 109 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)); 110 tableSource.setLastShippedAge(age); 111 tableSource.incrShippedBytes(entrySize); 112 } 113 } 114 115 /** 116 * Set the age of the last edit that was shipped group by table 117 * @param timestamp write time of the edit 118 * @param tableName String as group and tableName 119 */ 120 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 121 long age = EnvironmentEdgeManager.currentTime() - timestamp; 122 this.getSingleSourceSourceByTable() 123 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 124 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) 125 .setLastShippedAge(age); 126 } 127 128 /** 129 * get age of last shipped op of given wal group. If the walGroup is null, return 0 130 * @param walGroup which group we are getting 131 */ 132 public long getAgeOfLastShippedOp(String walGroup) { 133 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 134 } 135 136 /** 137 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 138 * when replication fails and need to keep that metric accurate. 139 * @param walGroupId id of the group to update 140 */ 141 public void refreshAgeOfLastShippedOp(String walGroupId) { 142 Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); 143 if (lastTimestamp == null) { 144 this.lastShippedTimeStamps.put(walGroupId, 0L); 145 lastTimestamp = 0L; 146 } 147 if (lastTimestamp > 0) { 148 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 149 } 150 } 151 152 /** 153 * Increment size of the log queue. 154 */ 155 public void incrSizeOfLogQueue() { 156 singleSourceSource.incrSizeOfLogQueue(1); 157 globalSourceSource.incrSizeOfLogQueue(1); 158 } 159 160 public void decrSizeOfLogQueue() { 161 singleSourceSource.decrSizeOfLogQueue(1); 162 globalSourceSource.decrSizeOfLogQueue(1); 163 } 164 165 /** 166 * Increment the count for initializing sources 167 */ 168 public void incrSourceInitializing() { 169 singleSourceSource.incrSourceInitializing(); 170 globalSourceSource.incrSourceInitializing(); 171 } 172 173 /** 174 * Decrement the count for initializing sources 175 */ 176 public void decrSourceInitializing() { 177 singleSourceSource.decrSourceInitializing(); 178 globalSourceSource.decrSourceInitializing(); 179 } 180 181 /** 182 * Add on the the number of log edits read 183 * @param delta the number of log edits read. 184 */ 185 private void incrLogEditsRead(long delta) { 186 singleSourceSource.incrLogReadInEdits(delta); 187 globalSourceSource.incrLogReadInEdits(delta); 188 } 189 190 /** Increment the number of log edits read by one. */ 191 public void incrLogEditsRead() { 192 incrLogEditsRead(1); 193 } 194 195 /** 196 * Add on the number of log edits filtered 197 * @param delta the number filtered. 198 */ 199 public void incrLogEditsFiltered(long delta) { 200 singleSourceSource.incrLogEditsFiltered(delta); 201 globalSourceSource.incrLogEditsFiltered(delta); 202 } 203 204 /** The number of log edits filtered out. */ 205 public void incrLogEditsFiltered() { 206 incrLogEditsFiltered(1); 207 } 208 209 /** 210 * Convience method to apply changes to metrics do to shipping a batch of logs. 211 * @param batchSize the size of the batch that was shipped to sinks. 212 */ 213 public void shipBatch(long batchSize, int sizeInBytes) { 214 singleSourceSource.incrBatchesShipped(1); 215 globalSourceSource.incrBatchesShipped(1); 216 217 singleSourceSource.incrOpsShipped(batchSize); 218 globalSourceSource.incrOpsShipped(batchSize); 219 220 singleSourceSource.incrShippedBytes(sizeInBytes); 221 globalSourceSource.incrShippedBytes(sizeInBytes); 222 } 223 224 /** 225 * Convenience method to update metrics when batch of operations has failed. 226 */ 227 public void incrementFailedBatches() { 228 singleSourceSource.incrFailedBatches(); 229 globalSourceSource.incrFailedBatches(); 230 } 231 232 /** 233 * Gets the number of edits not eligible for replication this source queue logs so far. 234 * @return logEditsFiltered non-replicable edits filtered from this queue logs. 235 */ 236 public long getEditsFiltered() { 237 return this.singleSourceSource.getEditsFiltered(); 238 } 239 240 /** 241 * Gets the number of edits eligible for replication read from this source queue logs so far. 242 * @return replicableEdits total number of replicable edits read from this queue logs. 243 */ 244 public long getReplicableEdits() { 245 return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); 246 } 247 248 /** 249 * Gets the number of OPs shipped by this source queue to target cluster. 250 * @return oPsShipped total number of OPs shipped by this source. 251 */ 252 public long getOpsShipped() { 253 return this.singleSourceSource.getShippedOps(); 254 } 255 256 /** 257 * Convience method to apply changes to metrics do to shipping a batch of logs. 258 * @param batchSize the size of the batch that was shipped to sinks. 259 * @param hfiles total number of hfiles shipped to sinks. 260 */ 261 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 262 shipBatch(batchSize, sizeInBytes); 263 singleSourceSource.incrHFilesShipped(hfiles); 264 globalSourceSource.incrHFilesShipped(hfiles); 265 } 266 267 /** increase the byte number read by source from log file */ 268 public void incrLogReadInBytes(long readInBytes) { 269 singleSourceSource.incrLogReadInBytes(readInBytes); 270 globalSourceSource.incrLogReadInBytes(readInBytes); 271 } 272 273 /** Removes all metrics about this Source. */ 274 public void clear() { 275 terminate(); 276 singleSourceSource.clear(); 277 } 278 279 public void terminate() { 280 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 281 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 282 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 283 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 284 lastShippedTimeStamps.clear(); 285 lastHFileRefsQueueSize = 0; 286 timeStampNextToReplicate = 0; 287 } 288 289 /** 290 * Get AgeOfLastShippedOp 291 */ 292 public Long getAgeOfLastShippedOp() { 293 return singleSourceSource.getLastShippedAge(); 294 } 295 296 /** 297 * Get the sizeOfLogQueue 298 */ 299 public int getSizeOfLogQueue() { 300 return singleSourceSource.getSizeOfLogQueue(); 301 } 302 303 /** 304 * Get the value of uncleanlyClosedWAL counter 305 */ 306 public long getUncleanlyClosedWALs() { 307 return singleSourceSource.getUncleanlyClosedWALs(); 308 } 309 310 /** 311 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 312 */ 313 public long getTimestampOfLastShippedOp() { 314 long lastTimestamp = 0L; 315 for (long ts : lastShippedTimeStamps.values()) { 316 if (ts > lastTimestamp) { 317 lastTimestamp = ts; 318 } 319 } 320 return lastTimestamp; 321 } 322 323 /** 324 * TimeStamp of next edit to be replicated. 325 * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. 326 */ 327 public long getTimeStampNextToReplicate() { 328 return timeStampNextToReplicate; 329 } 330 331 /** 332 * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp 333 * is greater than timestamp of last shipped, it means there's at least one edit pending 334 * replication. 335 * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. 336 */ 337 public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { 338 this.timeStampNextToReplicate = timeStampNextToReplicate; 339 } 340 341 public long getReplicationDelay() { 342 if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) { 343 return 0; 344 } else { 345 return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; 346 } 347 } 348 349 /** 350 * Get the source initializing counts 351 * @return number of replication sources getting initialized 352 */ 353 public int getSourceInitializing() { 354 return singleSourceSource.getSourceInitializing(); 355 } 356 357 /** 358 * Get the slave peer ID 359 */ 360 public String getPeerID() { 361 return id; 362 } 363 364 public void incrSizeOfHFileRefsQueue(long size) { 365 singleSourceSource.incrSizeOfHFileRefsQueue(size); 366 globalSourceSource.incrSizeOfHFileRefsQueue(size); 367 lastHFileRefsQueueSize = size; 368 } 369 370 public void decrSizeOfHFileRefsQueue(int size) { 371 singleSourceSource.decrSizeOfHFileRefsQueue(size); 372 globalSourceSource.decrSizeOfHFileRefsQueue(size); 373 lastHFileRefsQueueSize -= size; 374 if (lastHFileRefsQueueSize < 0) { 375 lastHFileRefsQueueSize = 0; 376 } 377 } 378 379 public void incrUnknownFileLengthForClosedWAL() { 380 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 381 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 382 } 383 384 public void incrUncleanlyClosedWALs() { 385 singleSourceSource.incrUncleanlyClosedWALs(); 386 globalSourceSource.incrUncleanlyClosedWALs(); 387 } 388 389 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 390 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 391 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 392 } 393 394 public void incrRestartedWALReading() { 395 singleSourceSource.incrRestartedWALReading(); 396 globalSourceSource.incrRestartedWALReading(); 397 } 398 399 public void incrRepeatedFileBytes(final long bytes) { 400 singleSourceSource.incrRepeatedFileBytes(bytes); 401 globalSourceSource.incrRepeatedFileBytes(bytes); 402 } 403 404 public void incrCompletedWAL() { 405 singleSourceSource.incrCompletedWAL(); 406 globalSourceSource.incrCompletedWAL(); 407 } 408 409 public void incrCompletedRecoveryQueue() { 410 singleSourceSource.incrCompletedRecoveryQueue(); 411 globalSourceSource.incrCompletedRecoveryQueue(); 412 } 413 414 public void incrFailedRecoveryQueue() { 415 globalSourceSource.incrFailedRecoveryQueue(); 416 } 417 418 /* 419 * Sets the age of oldest log file just for source. 420 */ 421 public void setOldestWalAge(long age) { 422 singleSourceSource.setOldestWalAge(age); 423 } 424 425 public long getOldestWalAge() { 426 return singleSourceSource.getOldestWalAge(); 427 } 428 429 @Override 430 public void init() { 431 singleSourceSource.init(); 432 globalSourceSource.init(); 433 } 434 435 @Override 436 public void setGauge(String gaugeName, long value) { 437 singleSourceSource.setGauge(gaugeName, value); 438 globalSourceSource.setGauge(gaugeName, value); 439 } 440 441 @Override 442 public void incGauge(String gaugeName, long delta) { 443 singleSourceSource.incGauge(gaugeName, delta); 444 globalSourceSource.incGauge(gaugeName, delta); 445 } 446 447 @Override 448 public void decGauge(String gaugeName, long delta) { 449 singleSourceSource.decGauge(gaugeName, delta); 450 globalSourceSource.decGauge(gaugeName, delta); 451 } 452 453 @Override 454 public void removeMetric(String key) { 455 singleSourceSource.removeMetric(key); 456 globalSourceSource.removeMetric(key); 457 } 458 459 @Override 460 public void incCounters(String counterName, long delta) { 461 singleSourceSource.incCounters(counterName, delta); 462 globalSourceSource.incCounters(counterName, delta); 463 } 464 465 @Override 466 public void updateHistogram(String name, long value) { 467 singleSourceSource.updateHistogram(name, value); 468 globalSourceSource.updateHistogram(name, value); 469 } 470 471 @Override 472 public String getMetricsContext() { 473 return globalSourceSource.getMetricsContext(); 474 } 475 476 @Override 477 public String getMetricsDescription() { 478 return globalSourceSource.getMetricsDescription(); 479 } 480 481 @Override 482 public String getMetricsJmxContext() { 483 return globalSourceSource.getMetricsJmxContext(); 484 } 485 486 @Override 487 public String getMetricsName() { 488 return globalSourceSource.getMetricsName(); 489 } 490 491 @InterfaceAudience.Private 492 public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { 493 return singleSourceSourceByTable; 494 } 495 496 /** 497 * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. 498 */ 499 public void setWALReaderEditsBufferUsage(long usageInBytes) { 500 globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); 501 } 502 503 /** 504 * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. 505 */ 506 public long getWALReaderEditsBufferUsage() { 507 return globalSourceSource.getWALReaderEditsBufferBytes(); 508 } 509}