001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.metrics.BaseSource;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This class is for maintaining the various replication statistics for a source and publishing them
035 * through the metrics interfaces.
036 */
037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
038public class MetricsSource implements BaseSource {
039
040  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
041
042  // tracks last shipped timestamp for each wal group
043  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
044  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
045  private long lastHFileRefsQueueSize = 0;
046  private String id;
047  private long timeStampNextToReplicate;
048
049  private final MetricsReplicationSourceSource singleSourceSource;
050  private final MetricsReplicationGlobalSourceSource globalSourceSource;
051  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
052
053  /**
054   * Constructor used to register the metrics
055   * @param id Name of the source this class is monitoring
056   */
057  public MetricsSource(String id) {
058    this.id = id;
059    singleSourceSource = CompatibilitySingletonFactory
060      .getInstance(MetricsReplicationSourceFactory.class).getSource(id);
061    globalSourceSource = CompatibilitySingletonFactory
062      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
063    singleSourceSourceByTable = new HashMap<>();
064  }
065
066  /**
067   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
068   * @param id                 Name of the source this class is monitoring
069   * @param singleSourceSource Class to monitor id-scoped metrics
070   * @param globalSourceSource Class to monitor global-scoped metrics
071   */
072  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
073    MetricsReplicationGlobalSourceSource globalSourceSource,
074    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
075    this.id = id;
076    this.singleSourceSource = singleSourceSource;
077    this.globalSourceSource = globalSourceSource;
078    this.singleSourceSourceByTable = singleSourceSourceByTable;
079  }
080
081  /**
082   * Set the age of the last edit that was shipped
083   * @param timestamp target write time of the edit
084   * @param walGroup  which group we are setting
085   */
086  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
087    long age = EnvironmentEdgeManager.currentTime() - timestamp;
088    singleSourceSource.setLastShippedAge(age);
089    globalSourceSource.setLastShippedAge(age);
090    this.ageOfLastShippedOp.put(walGroup, age);
091    this.lastShippedTimeStamps.put(walGroup, timestamp);
092  }
093
094  /**
095   * Update the table level replication metrics per table
096   * @param walEntries List of pairs of WAL entry and it's size
097   */
098  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
099    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
100      Entry entry = walEntryWithSize.getFirst();
101      long entrySize = walEntryWithSize.getSecond();
102      String tableName = entry.getKey().getTableName().getNameAsString();
103      long writeTime = entry.getKey().getWriteTime();
104      long age = EnvironmentEdgeManager.currentTime() - writeTime;
105
106      // get the replication metrics source for table at the run time
107      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
108        .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
109          .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t));
110      tableSource.setLastShippedAge(age);
111      tableSource.incrShippedBytes(entrySize);
112    }
113  }
114
115  /**
116   * Set the age of the last edit that was shipped group by table
117   * @param timestamp write time of the edit
118   * @param tableName String as group and tableName
119   */
120  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
121    long age = EnvironmentEdgeManager.currentTime() - timestamp;
122    this.getSingleSourceSourceByTable()
123      .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
124        .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
125      .setLastShippedAge(age);
126  }
127
128  /**
129   * get age of last shipped op of given wal group. If the walGroup is null, return 0
130   * @param walGroup which group we are getting
131   */
132  public long getAgeOfLastShippedOp(String walGroup) {
133    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
134  }
135
136  /**
137   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
138   * when replication fails and need to keep that metric accurate.
139   * @param walGroupId id of the group to update
140   */
141  public void refreshAgeOfLastShippedOp(String walGroupId) {
142    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
143    if (lastTimestamp == null) {
144      this.lastShippedTimeStamps.put(walGroupId, 0L);
145      lastTimestamp = 0L;
146    }
147    if (lastTimestamp > 0) {
148      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
149    }
150  }
151
152  /**
153   * Increment size of the log queue.
154   */
155  public void incrSizeOfLogQueue() {
156    singleSourceSource.incrSizeOfLogQueue(1);
157    globalSourceSource.incrSizeOfLogQueue(1);
158  }
159
160  public void decrSizeOfLogQueue() {
161    singleSourceSource.decrSizeOfLogQueue(1);
162    globalSourceSource.decrSizeOfLogQueue(1);
163  }
164
165  /**
166   * Increment the count for initializing sources
167   */
168  public void incrSourceInitializing() {
169    singleSourceSource.incrSourceInitializing();
170    globalSourceSource.incrSourceInitializing();
171  }
172
173  /**
174   * Decrement the count for initializing sources
175   */
176  public void decrSourceInitializing() {
177    singleSourceSource.decrSourceInitializing();
178    globalSourceSource.decrSourceInitializing();
179  }
180
181  /**
182   * Add on the the number of log edits read
183   * @param delta the number of log edits read.
184   */
185  private void incrLogEditsRead(long delta) {
186    singleSourceSource.incrLogReadInEdits(delta);
187    globalSourceSource.incrLogReadInEdits(delta);
188  }
189
190  /** Increment the number of log edits read by one. */
191  public void incrLogEditsRead() {
192    incrLogEditsRead(1);
193  }
194
195  /**
196   * Add on the number of log edits filtered
197   * @param delta the number filtered.
198   */
199  public void incrLogEditsFiltered(long delta) {
200    singleSourceSource.incrLogEditsFiltered(delta);
201    globalSourceSource.incrLogEditsFiltered(delta);
202  }
203
204  /** The number of log edits filtered out. */
205  public void incrLogEditsFiltered() {
206    incrLogEditsFiltered(1);
207  }
208
209  /**
210   * Convience method to apply changes to metrics do to shipping a batch of logs.
211   * @param batchSize the size of the batch that was shipped to sinks.
212   */
213  public void shipBatch(long batchSize, int sizeInBytes) {
214    singleSourceSource.incrBatchesShipped(1);
215    globalSourceSource.incrBatchesShipped(1);
216
217    singleSourceSource.incrOpsShipped(batchSize);
218    globalSourceSource.incrOpsShipped(batchSize);
219
220    singleSourceSource.incrShippedBytes(sizeInBytes);
221    globalSourceSource.incrShippedBytes(sizeInBytes);
222  }
223
224  /**
225   * Convenience method to update metrics when batch of operations has failed.
226   */
227  public void incrementFailedBatches() {
228    singleSourceSource.incrFailedBatches();
229    globalSourceSource.incrFailedBatches();
230  }
231
232  /**
233   * Gets the number of edits not eligible for replication this source queue logs so far.
234   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
235   */
236  public long getEditsFiltered() {
237    return this.singleSourceSource.getEditsFiltered();
238  }
239
240  /**
241   * Gets the number of edits eligible for replication read from this source queue logs so far.
242   * @return replicableEdits total number of replicable edits read from this queue logs.
243   */
244  public long getReplicableEdits() {
245    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
246  }
247
248  /**
249   * Gets the number of OPs shipped by this source queue to target cluster.
250   * @return oPsShipped total number of OPs shipped by this source.
251   */
252  public long getOpsShipped() {
253    return this.singleSourceSource.getShippedOps();
254  }
255
256  /**
257   * Convience method to apply changes to metrics do to shipping a batch of logs.
258   * @param batchSize the size of the batch that was shipped to sinks.
259   * @param hfiles    total number of hfiles shipped to sinks.
260   */
261  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
262    shipBatch(batchSize, sizeInBytes);
263    singleSourceSource.incrHFilesShipped(hfiles);
264    globalSourceSource.incrHFilesShipped(hfiles);
265  }
266
267  /** increase the byte number read by source from log file */
268  public void incrLogReadInBytes(long readInBytes) {
269    singleSourceSource.incrLogReadInBytes(readInBytes);
270    globalSourceSource.incrLogReadInBytes(readInBytes);
271  }
272
273  /** Removes all metrics about this Source. */
274  public void clear() {
275    terminate();
276    singleSourceSource.clear();
277  }
278
279  public void terminate() {
280    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
281    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
282    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
283    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
284    lastShippedTimeStamps.clear();
285    lastHFileRefsQueueSize = 0;
286    timeStampNextToReplicate = 0;
287  }
288
289  /**
290   * Get AgeOfLastShippedOp
291   */
292  public Long getAgeOfLastShippedOp() {
293    return singleSourceSource.getLastShippedAge();
294  }
295
296  /**
297   * Get the sizeOfLogQueue
298   */
299  public int getSizeOfLogQueue() {
300    return singleSourceSource.getSizeOfLogQueue();
301  }
302
303  /**
304   * Get the value of uncleanlyClosedWAL counter
305   */
306  public long getUncleanlyClosedWALs() {
307    return singleSourceSource.getUncleanlyClosedWALs();
308  }
309
310  /**
311   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
312   */
313  public long getTimestampOfLastShippedOp() {
314    long lastTimestamp = 0L;
315    for (long ts : lastShippedTimeStamps.values()) {
316      if (ts > lastTimestamp) {
317        lastTimestamp = ts;
318      }
319    }
320    return lastTimestamp;
321  }
322
323  /**
324   * TimeStamp of next edit to be replicated.
325   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
326   */
327  public long getTimeStampNextToReplicate() {
328    return timeStampNextToReplicate;
329  }
330
331  /**
332   * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp
333   * is greater than timestamp of last shipped, it means there's at least one edit pending
334   * replication.
335   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
336   */
337  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
338    this.timeStampNextToReplicate = timeStampNextToReplicate;
339  }
340
341  public long getReplicationDelay() {
342    if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) {
343      return 0;
344    } else {
345      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
346    }
347  }
348
349  /**
350   * Get the source initializing counts
351   * @return number of replication sources getting initialized
352   */
353  public int getSourceInitializing() {
354    return singleSourceSource.getSourceInitializing();
355  }
356
357  /**
358   * Get the slave peer ID
359   */
360  public String getPeerID() {
361    return id;
362  }
363
364  public void incrSizeOfHFileRefsQueue(long size) {
365    singleSourceSource.incrSizeOfHFileRefsQueue(size);
366    globalSourceSource.incrSizeOfHFileRefsQueue(size);
367    lastHFileRefsQueueSize = size;
368  }
369
370  public void decrSizeOfHFileRefsQueue(int size) {
371    singleSourceSource.decrSizeOfHFileRefsQueue(size);
372    globalSourceSource.decrSizeOfHFileRefsQueue(size);
373    lastHFileRefsQueueSize -= size;
374    if (lastHFileRefsQueueSize < 0) {
375      lastHFileRefsQueueSize = 0;
376    }
377  }
378
379  public void incrUnknownFileLengthForClosedWAL() {
380    singleSourceSource.incrUnknownFileLengthForClosedWAL();
381    globalSourceSource.incrUnknownFileLengthForClosedWAL();
382  }
383
384  public void incrUncleanlyClosedWALs() {
385    singleSourceSource.incrUncleanlyClosedWALs();
386    globalSourceSource.incrUncleanlyClosedWALs();
387  }
388
389  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
390    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
391    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
392  }
393
394  public void incrRestartedWALReading() {
395    singleSourceSource.incrRestartedWALReading();
396    globalSourceSource.incrRestartedWALReading();
397  }
398
399  public void incrRepeatedFileBytes(final long bytes) {
400    singleSourceSource.incrRepeatedFileBytes(bytes);
401    globalSourceSource.incrRepeatedFileBytes(bytes);
402  }
403
404  public void incrCompletedWAL() {
405    singleSourceSource.incrCompletedWAL();
406    globalSourceSource.incrCompletedWAL();
407  }
408
409  public void incrCompletedRecoveryQueue() {
410    singleSourceSource.incrCompletedRecoveryQueue();
411    globalSourceSource.incrCompletedRecoveryQueue();
412  }
413
414  public void incrFailedRecoveryQueue() {
415    globalSourceSource.incrFailedRecoveryQueue();
416  }
417
418  /*
419   * Sets the age of oldest log file just for source.
420   */
421  public void setOldestWalAge(long age) {
422    singleSourceSource.setOldestWalAge(age);
423  }
424
425  public long getOldestWalAge() {
426    return singleSourceSource.getOldestWalAge();
427  }
428
429  @Override
430  public void init() {
431    singleSourceSource.init();
432    globalSourceSource.init();
433  }
434
435  @Override
436  public void setGauge(String gaugeName, long value) {
437    singleSourceSource.setGauge(gaugeName, value);
438    globalSourceSource.setGauge(gaugeName, value);
439  }
440
441  @Override
442  public void incGauge(String gaugeName, long delta) {
443    singleSourceSource.incGauge(gaugeName, delta);
444    globalSourceSource.incGauge(gaugeName, delta);
445  }
446
447  @Override
448  public void decGauge(String gaugeName, long delta) {
449    singleSourceSource.decGauge(gaugeName, delta);
450    globalSourceSource.decGauge(gaugeName, delta);
451  }
452
453  @Override
454  public void removeMetric(String key) {
455    singleSourceSource.removeMetric(key);
456    globalSourceSource.removeMetric(key);
457  }
458
459  @Override
460  public void incCounters(String counterName, long delta) {
461    singleSourceSource.incCounters(counterName, delta);
462    globalSourceSource.incCounters(counterName, delta);
463  }
464
465  @Override
466  public void updateHistogram(String name, long value) {
467    singleSourceSource.updateHistogram(name, value);
468    globalSourceSource.updateHistogram(name, value);
469  }
470
471  @Override
472  public String getMetricsContext() {
473    return globalSourceSource.getMetricsContext();
474  }
475
476  @Override
477  public String getMetricsDescription() {
478    return globalSourceSource.getMetricsDescription();
479  }
480
481  @Override
482  public String getMetricsJmxContext() {
483    return globalSourceSource.getMetricsJmxContext();
484  }
485
486  @Override
487  public String getMetricsName() {
488    return globalSourceSource.getMetricsName();
489  }
490
491  @InterfaceAudience.Private
492  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
493    return singleSourceSourceByTable;
494  }
495
496  /**
497   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
498   */
499  public void setWALReaderEditsBufferUsage(long usageInBytes) {
500    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
501  }
502
503  /**
504   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
505   */
506  public long getWALReaderEditsBufferUsage() {
507    return globalSourceSource.getWALReaderEditsBufferBytes();
508  }
509}