001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.metrics.BaseSource;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This class is for maintaining the various replication statistics for a source and publishing them
035 * through the metrics interfaces.
036 */
037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
038public class MetricsSource implements BaseSource {
039
040  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
041
042  // tracks last shipped timestamp for each wal group
043  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
044  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
045  private long lastHFileRefsQueueSize = 0;
046  private String id;
047  private long timeStampNextToReplicate;
048
049  private final MetricsReplicationSourceSource singleSourceSource;
050  private final MetricsReplicationGlobalSourceSource globalSourceSource;
051  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
052
053  /**
054   * Constructor used to register the metrics
055   * @param id Name of the source this class is monitoring
056   */
057  public MetricsSource(String id) {
058    this.id = id;
059    singleSourceSource = CompatibilitySingletonFactory
060      .getInstance(MetricsReplicationSourceFactory.class).getSource(id);
061    globalSourceSource = CompatibilitySingletonFactory
062      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
063    singleSourceSourceByTable = new HashMap<>();
064  }
065
066  /**
067   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
068   * @param id                 Name of the source this class is monitoring
069   * @param singleSourceSource Class to monitor id-scoped metrics
070   * @param globalSourceSource Class to monitor global-scoped metrics
071   */
072  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
073    MetricsReplicationGlobalSourceSource globalSourceSource,
074    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
075    this.id = id;
076    this.singleSourceSource = singleSourceSource;
077    this.globalSourceSource = globalSourceSource;
078    this.singleSourceSourceByTable = singleSourceSourceByTable;
079  }
080
081  /**
082   * Set the age of the last edit that was shipped
083   * @param timestamp target write time of the edit
084   * @param walGroup  which group we are setting
085   */
086  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
087    long age = EnvironmentEdgeManager.currentTime() - timestamp;
088    singleSourceSource.setLastShippedAge(age);
089    globalSourceSource.setLastShippedAge(age);
090    this.ageOfLastShippedOp.put(walGroup, age);
091    this.lastShippedTimeStamps.put(walGroup, timestamp);
092  }
093
094  /**
095   * Update the table level replication metrics per table
096   * @param walEntries List of pairs of WAL entry and it's size
097   */
098  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
099    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
100      Entry entry = walEntryWithSize.getFirst();
101      long entrySize = walEntryWithSize.getSecond();
102      String tableName = entry.getKey().getTableName().getNameAsString();
103      long writeTime = entry.getKey().getWriteTime();
104      long age = EnvironmentEdgeManager.currentTime() - writeTime;
105
106      // get the replication metrics source for table at the run time
107      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
108        .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
109          .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t));
110      tableSource.setLastShippedAge(age);
111      tableSource.incrShippedBytes(entrySize);
112    }
113  }
114
115  /**
116   * Set the age of the last edit that was shipped group by table
117   * @param timestamp write time of the edit
118   * @param tableName String as group and tableName
119   */
120  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
121    long age = EnvironmentEdgeManager.currentTime() - timestamp;
122    this.getSingleSourceSourceByTable()
123      .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
124        .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
125      .setLastShippedAge(age);
126  }
127
128  /**
129   * get age of last shipped op of given wal group. If the walGroup is null, return 0
130   * @param walGroup which group we are getting n
131   */
132  public long getAgeOfLastShippedOp(String walGroup) {
133    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
134  }
135
136  /**
137   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
138   * when replication fails and need to keep that metric accurate.
139   * @param walGroupId id of the group to update
140   */
141  public void refreshAgeOfLastShippedOp(String walGroupId) {
142    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
143    if (lastTimestamp == null) {
144      this.lastShippedTimeStamps.put(walGroupId, 0L);
145      lastTimestamp = 0L;
146    }
147    if (lastTimestamp > 0) {
148      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
149    }
150  }
151
152  /**
153   * Increment size of the log queue.
154   */
155  public void incrSizeOfLogQueue() {
156    singleSourceSource.incrSizeOfLogQueue(1);
157    globalSourceSource.incrSizeOfLogQueue(1);
158  }
159
160  public void decrSizeOfLogQueue() {
161    singleSourceSource.decrSizeOfLogQueue(1);
162    globalSourceSource.decrSizeOfLogQueue(1);
163  }
164
165  /**
166   * Increment the count for initializing sources
167   */
168  public void incrSourceInitializing() {
169    singleSourceSource.incrSourceInitializing();
170    globalSourceSource.incrSourceInitializing();
171  }
172
173  /**
174   * Decrement the count for initializing sources
175   */
176  public void decrSourceInitializing() {
177    singleSourceSource.decrSourceInitializing();
178    globalSourceSource.decrSourceInitializing();
179  }
180
181  /**
182   * Add on the the number of log edits read
183   * @param delta the number of log edits read.
184   */
185  private void incrLogEditsRead(long delta) {
186    singleSourceSource.incrLogReadInEdits(delta);
187    globalSourceSource.incrLogReadInEdits(delta);
188  }
189
190  /** Increment the number of log edits read by one. */
191  public void incrLogEditsRead() {
192    incrLogEditsRead(1);
193  }
194
195  /**
196   * Add on the number of log edits filtered
197   * @param delta the number filtered.
198   */
199  public void incrLogEditsFiltered(long delta) {
200    singleSourceSource.incrLogEditsFiltered(delta);
201    globalSourceSource.incrLogEditsFiltered(delta);
202  }
203
204  /** The number of log edits filtered out. */
205  public void incrLogEditsFiltered() {
206    incrLogEditsFiltered(1);
207  }
208
209  /**
210   * Convience method to apply changes to metrics do to shipping a batch of logs.
211   * @param batchSize the size of the batch that was shipped to sinks.
212   */
213  public void shipBatch(long batchSize, int sizeInBytes) {
214    singleSourceSource.incrBatchesShipped(1);
215    globalSourceSource.incrBatchesShipped(1);
216
217    singleSourceSource.incrOpsShipped(batchSize);
218    globalSourceSource.incrOpsShipped(batchSize);
219
220    singleSourceSource.incrShippedBytes(sizeInBytes);
221    globalSourceSource.incrShippedBytes(sizeInBytes);
222  }
223
224  /**
225   * Convenience method to update metrics when batch of operations has failed.
226   */
227  public void incrementFailedBatches() {
228    singleSourceSource.incrFailedBatches();
229    globalSourceSource.incrFailedBatches();
230  }
231
232  /**
233   * Gets the number of edits not eligible for replication this source queue logs so far.
234   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
235   */
236  public long getEditsFiltered() {
237    return this.singleSourceSource.getEditsFiltered();
238  }
239
240  /**
241   * Gets the number of edits eligible for replication read from this source queue logs so far.
242   * @return replicableEdits total number of replicable edits read from this queue logs.
243   */
244  public long getReplicableEdits() {
245    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
246  }
247
248  /**
249   * Gets the number of OPs shipped by this source queue to target cluster.
250   * @return oPsShipped total number of OPs shipped by this source.
251   */
252  public long getOpsShipped() {
253    return this.singleSourceSource.getShippedOps();
254  }
255
256  /**
257   * Convience method to apply changes to metrics do to shipping a batch of logs.
258   * @param batchSize the size of the batch that was shipped to sinks.
259   * @param hfiles    total number of hfiles shipped to sinks.
260   */
261  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
262    shipBatch(batchSize, sizeInBytes);
263    singleSourceSource.incrHFilesShipped(hfiles);
264    globalSourceSource.incrHFilesShipped(hfiles);
265  }
266
267  /** increase the byte number read by source from log file */
268  public void incrLogReadInBytes(long readInBytes) {
269    singleSourceSource.incrLogReadInBytes(readInBytes);
270    globalSourceSource.incrLogReadInBytes(readInBytes);
271  }
272
273  /** Removes all metrics about this Source. */
274  public void clear() {
275    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
276    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
277    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
278    singleSourceSource.clear();
279    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
280    lastShippedTimeStamps.clear();
281    lastHFileRefsQueueSize = 0;
282    timeStampNextToReplicate = 0;
283  }
284
285  /**
286   * Get AgeOfLastShippedOp n
287   */
288  public Long getAgeOfLastShippedOp() {
289    return singleSourceSource.getLastShippedAge();
290  }
291
292  /**
293   * Get the sizeOfLogQueue n
294   */
295  public int getSizeOfLogQueue() {
296    return singleSourceSource.getSizeOfLogQueue();
297  }
298
299  /**
300   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one n
301   * * @deprecated Since 2.0.0. Removed in 3.0.0.
302   * @see #getTimestampOfLastShippedOp()
303   */
304  @Deprecated
305  public long getTimeStampOfLastShippedOp() {
306    return getTimestampOfLastShippedOp();
307  }
308
309  /**
310   * Get the value of uncleanlyClosedWAL counter n
311   */
312  public long getUncleanlyClosedWALs() {
313    return singleSourceSource.getUncleanlyClosedWALs();
314  }
315
316  /**
317   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one n
318   */
319  public long getTimestampOfLastShippedOp() {
320    long lastTimestamp = 0L;
321    for (long ts : lastShippedTimeStamps.values()) {
322      if (ts > lastTimestamp) {
323        lastTimestamp = ts;
324      }
325    }
326    return lastTimestamp;
327  }
328
329  /**
330   * TimeStamp of next edit to be replicated.
331   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
332   */
333  public long getTimeStampNextToReplicate() {
334    return timeStampNextToReplicate;
335  }
336
337  /**
338   * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp
339   * is greater than timestamp of last shipped, it means there's at least one edit pending
340   * replication.
341   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
342   */
343  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
344    this.timeStampNextToReplicate = timeStampNextToReplicate;
345  }
346
347  public long getReplicationDelay() {
348    if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) {
349      return 0;
350    } else {
351      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
352    }
353  }
354
355  /**
356   * Get the source initializing counts
357   * @return number of replication sources getting initialized
358   */
359  public int getSourceInitializing() {
360    return singleSourceSource.getSourceInitializing();
361  }
362
363  /**
364   * Get the slave peer ID n
365   */
366  public String getPeerID() {
367    return id;
368  }
369
370  public void incrSizeOfHFileRefsQueue(long size) {
371    singleSourceSource.incrSizeOfHFileRefsQueue(size);
372    globalSourceSource.incrSizeOfHFileRefsQueue(size);
373    lastHFileRefsQueueSize = size;
374  }
375
376  public void decrSizeOfHFileRefsQueue(int size) {
377    singleSourceSource.decrSizeOfHFileRefsQueue(size);
378    globalSourceSource.decrSizeOfHFileRefsQueue(size);
379    lastHFileRefsQueueSize -= size;
380    if (lastHFileRefsQueueSize < 0) {
381      lastHFileRefsQueueSize = 0;
382    }
383  }
384
385  public void incrUnknownFileLengthForClosedWAL() {
386    singleSourceSource.incrUnknownFileLengthForClosedWAL();
387    globalSourceSource.incrUnknownFileLengthForClosedWAL();
388  }
389
390  public void incrUncleanlyClosedWALs() {
391    singleSourceSource.incrUncleanlyClosedWALs();
392    globalSourceSource.incrUncleanlyClosedWALs();
393  }
394
395  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
396    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
397    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
398  }
399
400  public void incrRestartedWALReading() {
401    singleSourceSource.incrRestartedWALReading();
402    globalSourceSource.incrRestartedWALReading();
403  }
404
405  public void incrRepeatedFileBytes(final long bytes) {
406    singleSourceSource.incrRepeatedFileBytes(bytes);
407    globalSourceSource.incrRepeatedFileBytes(bytes);
408  }
409
410  public void incrCompletedWAL() {
411    singleSourceSource.incrCompletedWAL();
412    globalSourceSource.incrCompletedWAL();
413  }
414
415  public void incrCompletedRecoveryQueue() {
416    singleSourceSource.incrCompletedRecoveryQueue();
417    globalSourceSource.incrCompletedRecoveryQueue();
418  }
419
420  public void incrFailedRecoveryQueue() {
421    globalSourceSource.incrFailedRecoveryQueue();
422  }
423
424  /*
425   * Sets the age of oldest log file just for source.
426   */
427  public void setOldestWalAge(long age) {
428    singleSourceSource.setOldestWalAge(age);
429  }
430
431  public long getOldestWalAge() {
432    return singleSourceSource.getOldestWalAge();
433  }
434
435  @Override
436  public void init() {
437    singleSourceSource.init();
438    globalSourceSource.init();
439  }
440
441  @Override
442  public void setGauge(String gaugeName, long value) {
443    singleSourceSource.setGauge(gaugeName, value);
444    globalSourceSource.setGauge(gaugeName, value);
445  }
446
447  @Override
448  public void incGauge(String gaugeName, long delta) {
449    singleSourceSource.incGauge(gaugeName, delta);
450    globalSourceSource.incGauge(gaugeName, delta);
451  }
452
453  @Override
454  public void decGauge(String gaugeName, long delta) {
455    singleSourceSource.decGauge(gaugeName, delta);
456    globalSourceSource.decGauge(gaugeName, delta);
457  }
458
459  @Override
460  public void removeMetric(String key) {
461    singleSourceSource.removeMetric(key);
462    globalSourceSource.removeMetric(key);
463  }
464
465  @Override
466  public void incCounters(String counterName, long delta) {
467    singleSourceSource.incCounters(counterName, delta);
468    globalSourceSource.incCounters(counterName, delta);
469  }
470
471  @Override
472  public void updateHistogram(String name, long value) {
473    singleSourceSource.updateHistogram(name, value);
474    globalSourceSource.updateHistogram(name, value);
475  }
476
477  @Override
478  public String getMetricsContext() {
479    return globalSourceSource.getMetricsContext();
480  }
481
482  @Override
483  public String getMetricsDescription() {
484    return globalSourceSource.getMetricsDescription();
485  }
486
487  @Override
488  public String getMetricsJmxContext() {
489    return globalSourceSource.getMetricsJmxContext();
490  }
491
492  @Override
493  public String getMetricsName() {
494    return globalSourceSource.getMetricsName();
495  }
496
497  @InterfaceAudience.Private
498  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
499    return singleSourceSourceByTable;
500  }
501
502  /**
503   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
504   */
505  public void setWALReaderEditsBufferUsage(long usageInBytes) {
506    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
507  }
508
509  /**
510   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. n
511   */
512  public long getWALReaderEditsBufferUsage() {
513    return globalSourceSource.getWALReaderEditsBufferBytes();
514  }
515}