001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
025import org.apache.hadoop.hbase.HBaseInterfaceAudience;
026import org.apache.hadoop.hbase.metrics.BaseSource;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.hadoop.hbase.wal.WAL.Entry;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This class is for maintaining the various replication statistics for a source and publishing them
036 * through the metrics interfaces.
037 */
038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
039public class MetricsSource implements BaseSource {
040
041  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
042
043  // tracks last shipped timestamp for each wal group
044  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
045  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
046  private long lastHFileRefsQueueSize = 0;
047  private String id;
048  private long timeStampNextToReplicate;
049
050  private final MetricsReplicationSourceSource singleSourceSource;
051  private final MetricsReplicationGlobalSourceSource globalSourceSource;
052  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
053
054  /**
055   * Constructor used to register the metrics
056   *
057   * @param id Name of the source this class is monitoring
058   */
059  public MetricsSource(String id) {
060    this.id = id;
061    singleSourceSource =
062        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
063            .getSource(id);
064    globalSourceSource = CompatibilitySingletonFactory
065      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
066    singleSourceSourceByTable = new HashMap<>();
067  }
068
069  /**
070   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
071   * @param id Name of the source this class is monitoring
072   * @param singleSourceSource Class to monitor id-scoped metrics
073   * @param globalSourceSource Class to monitor global-scoped metrics
074   */
075  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
076                       MetricsReplicationGlobalSourceSource globalSourceSource,
077                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
078    this.id = id;
079    this.singleSourceSource = singleSourceSource;
080    this.globalSourceSource = globalSourceSource;
081    this.singleSourceSourceByTable = singleSourceSourceByTable;
082  }
083
084  /**
085   * Set the age of the last edit that was shipped
086   * @param timestamp target write time of the edit
087   * @param walGroup which group we are setting
088   */
089  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
090    long age = EnvironmentEdgeManager.currentTime() - timestamp;
091    singleSourceSource.setLastShippedAge(age);
092    globalSourceSource.setLastShippedAge(age);
093    this.ageOfLastShippedOp.put(walGroup, age);
094    this.lastShippedTimeStamps.put(walGroup, timestamp);
095  }
096
097  /**
098   * Update the table level replication metrics per table
099   *
100   * @param walEntries List of pairs of WAL entry and it's size
101   */
102  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
103    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
104      Entry entry = walEntryWithSize.getFirst();
105      long entrySize = walEntryWithSize.getSecond();
106      String tableName = entry.getKey().getTableName().getNameAsString();
107      long writeTime = entry.getKey().getWriteTime();
108      long age = EnvironmentEdgeManager.currentTime() - writeTime;
109
110      // get the replication metrics source for table at the run time
111      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
112        .computeIfAbsent(tableName,
113          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
114            .getTableSource(t));
115      tableSource.setLastShippedAge(age);
116      tableSource.incrShippedBytes(entrySize);
117    }
118  }
119
120  /**
121   * Set the age of the last edit that was shipped group by table
122   * @param timestamp write time of the edit
123   * @param tableName String as group and tableName
124   */
125  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
126    long age = EnvironmentEdgeManager.currentTime() - timestamp;
127    this.getSingleSourceSourceByTable().computeIfAbsent(
128        tableName, t -> CompatibilitySingletonFactory
129            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
130            .setLastShippedAge(age);
131  }
132
133  /**
134   * get age of last shipped op of given wal group. If the walGroup is null, return 0
135   * @param walGroup which group we are getting
136   * @return age
137   */
138  public long getAgeOfLastShippedOp(String walGroup) {
139    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
140  }
141
142  /**
143   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
144   * when replication fails and need to keep that metric accurate.
145   * @param walGroupId id of the group to update
146   */
147  public void refreshAgeOfLastShippedOp(String walGroupId) {
148    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
149    if (lastTimestamp == null) {
150      this.lastShippedTimeStamps.put(walGroupId, 0L);
151      lastTimestamp = 0L;
152    }
153    if (lastTimestamp > 0) {
154      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
155    }
156  }
157
158  /**
159   * Increment size of the log queue.
160   */
161  public void incrSizeOfLogQueue() {
162    singleSourceSource.incrSizeOfLogQueue(1);
163    globalSourceSource.incrSizeOfLogQueue(1);
164  }
165
166  public void decrSizeOfLogQueue() {
167    singleSourceSource.decrSizeOfLogQueue(1);
168    globalSourceSource.decrSizeOfLogQueue(1);
169  }
170
171  /**
172   * Increment the count for initializing sources
173   */
174  public void incrSourceInitializing() {
175    singleSourceSource.incrSourceInitializing();
176    globalSourceSource.incrSourceInitializing();
177  }
178
179  /**
180   * Decrement the count for initializing sources
181   */
182  public void decrSourceInitializing() {
183    singleSourceSource.decrSourceInitializing();
184    globalSourceSource.decrSourceInitializing();
185  }
186
187  /**
188   * Add on the the number of log edits read
189   *
190   * @param delta the number of log edits read.
191   */
192  private void incrLogEditsRead(long delta) {
193    singleSourceSource.incrLogReadInEdits(delta);
194    globalSourceSource.incrLogReadInEdits(delta);
195  }
196
197  /** Increment the number of log edits read by one. */
198  public void incrLogEditsRead() {
199    incrLogEditsRead(1);
200  }
201
202  /**
203   * Add on the number of log edits filtered
204   *
205   * @param delta the number filtered.
206   */
207  public void incrLogEditsFiltered(long delta) {
208    singleSourceSource.incrLogEditsFiltered(delta);
209    globalSourceSource.incrLogEditsFiltered(delta);
210  }
211
212  /** The number of log edits filtered out. */
213  public void incrLogEditsFiltered() {
214    incrLogEditsFiltered(1);
215  }
216
217  /**
218   * Convience method to apply changes to metrics do to shipping a batch of logs.
219   *
220   * @param batchSize the size of the batch that was shipped to sinks.
221   */
222  public void shipBatch(long batchSize, int sizeInBytes) {
223    singleSourceSource.incrBatchesShipped(1);
224    globalSourceSource.incrBatchesShipped(1);
225
226    singleSourceSource.incrOpsShipped(batchSize);
227    globalSourceSource.incrOpsShipped(batchSize);
228
229    singleSourceSource.incrShippedBytes(sizeInBytes);
230    globalSourceSource.incrShippedBytes(sizeInBytes);
231  }
232
233  /**
234   * Gets the number of edits not eligible for replication this source queue logs so far.
235   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
236   */
237  public long getEditsFiltered(){
238    return this.singleSourceSource.getEditsFiltered();
239  }
240
241  /**
242   * Gets the number of edits eligible for replication read from this source queue logs so far.
243   * @return replicableEdits total number of replicable edits read from this queue logs.
244   */
245  public long getReplicableEdits(){
246    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
247  }
248
249  /**
250   * Gets the number of OPs shipped by this source queue to target cluster.
251   * @return oPsShipped total number of OPs shipped by this source.
252   */
253  public long getOpsShipped() {
254    return this.singleSourceSource.getShippedOps();
255  }
256
257  /**
258   * Convience method to apply changes to metrics do to shipping a batch of logs.
259   *
260   * @param batchSize the size of the batch that was shipped to sinks.
261   * @param hfiles total number of hfiles shipped to sinks.
262   */
263  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
264    shipBatch(batchSize, sizeInBytes);
265    singleSourceSource.incrHFilesShipped(hfiles);
266    globalSourceSource.incrHFilesShipped(hfiles);
267  }
268
269  /** increase the byte number read by source from log file */
270  public void incrLogReadInBytes(long readInBytes) {
271    singleSourceSource.incrLogReadInBytes(readInBytes);
272    globalSourceSource.incrLogReadInBytes(readInBytes);
273  }
274
275  /** Removes all metrics about this Source. */
276  public void clear() {
277    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
278    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
279    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
280    singleSourceSource.clear();
281    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
282    lastShippedTimeStamps.clear();
283    lastHFileRefsQueueSize = 0;
284    timeStampNextToReplicate = 0;
285  }
286
287  /**
288   * Get AgeOfLastShippedOp
289   * @return AgeOfLastShippedOp
290   */
291  public Long getAgeOfLastShippedOp() {
292    return singleSourceSource.getLastShippedAge();
293  }
294
295  /**
296   * Get the sizeOfLogQueue
297   * @return sizeOfLogQueue
298   */
299  public int getSizeOfLogQueue() {
300    return singleSourceSource.getSizeOfLogQueue();
301  }
302
303
304  /**
305   * Get the value of uncleanlyClosedWAL counter
306   * @return uncleanlyClosedWAL
307   */
308  public long getUncleanlyClosedWALs() {
309    return singleSourceSource.getUncleanlyClosedWALs();
310  }
311
312  /**
313   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
314   * @return lastTimestampForAge
315   */
316  public long getTimestampOfLastShippedOp() {
317    long lastTimestamp = 0L;
318    for (long ts : lastShippedTimeStamps.values()) {
319      if (ts > lastTimestamp) {
320        lastTimestamp = ts;
321      }
322    }
323    return lastTimestamp;
324  }
325
326  /**
327   * TimeStamp of next edit to be replicated.
328   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
329   */
330  public long getTimeStampNextToReplicate() {
331    return timeStampNextToReplicate;
332  }
333
334  /**
335   * TimeStamp of next edit targeted for replication. Used for calculating lag,
336   * as if this timestamp is greater than timestamp of last shipped, it means there's
337   * at least one edit pending replication.
338   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
339   */
340  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
341    this.timeStampNextToReplicate = timeStampNextToReplicate;
342  }
343
344  public long getReplicationDelay() {
345    if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
346      return 0;
347    }else{
348      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
349    }
350  }
351
352  /**
353   * Get the source initializing counts
354   * @return number of replication sources getting initialized
355   */
356  public int getSourceInitializing() {
357    return singleSourceSource.getSourceInitializing();
358  }
359
360  /**
361   * Get the slave peer ID
362   * @return peerID
363   */
364  public String getPeerID() {
365    return id;
366  }
367
368  public void incrSizeOfHFileRefsQueue(long size) {
369    singleSourceSource.incrSizeOfHFileRefsQueue(size);
370    globalSourceSource.incrSizeOfHFileRefsQueue(size);
371    lastHFileRefsQueueSize = size;
372  }
373
374  public void decrSizeOfHFileRefsQueue(int size) {
375    singleSourceSource.decrSizeOfHFileRefsQueue(size);
376    globalSourceSource.decrSizeOfHFileRefsQueue(size);
377    lastHFileRefsQueueSize -= size;
378    if (lastHFileRefsQueueSize < 0) {
379      lastHFileRefsQueueSize = 0;
380    }
381  }
382
383  public void incrUnknownFileLengthForClosedWAL() {
384    singleSourceSource.incrUnknownFileLengthForClosedWAL();
385    globalSourceSource.incrUnknownFileLengthForClosedWAL();
386  }
387
388  public void incrUncleanlyClosedWALs() {
389    singleSourceSource.incrUncleanlyClosedWALs();
390    globalSourceSource.incrUncleanlyClosedWALs();
391  }
392
393  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
394    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
395    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
396  }
397
398  public void incrRestartedWALReading() {
399    singleSourceSource.incrRestartedWALReading();
400    globalSourceSource.incrRestartedWALReading();
401  }
402
403  public void incrRepeatedFileBytes(final long bytes) {
404    singleSourceSource.incrRepeatedFileBytes(bytes);
405    globalSourceSource.incrRepeatedFileBytes(bytes);
406  }
407
408  public void incrCompletedWAL() {
409    singleSourceSource.incrCompletedWAL();
410    globalSourceSource.incrCompletedWAL();
411  }
412
413  public void incrCompletedRecoveryQueue() {
414    singleSourceSource.incrCompletedRecoveryQueue();
415    globalSourceSource.incrCompletedRecoveryQueue();
416  }
417
418  public void incrFailedRecoveryQueue() {
419    globalSourceSource.incrFailedRecoveryQueue();
420  }
421
422  /*
423   Sets the age of oldest log file just for source.
424  */
425  public void setOldestWalAge(long age) {
426    singleSourceSource.setOldestWalAge(age);
427  }
428
429  public long getOldestWalAge() {
430    return singleSourceSource.getOldestWalAge();
431  }
432
433  @Override
434  public void init() {
435    singleSourceSource.init();
436    globalSourceSource.init();
437  }
438
439  @Override
440  public void setGauge(String gaugeName, long value) {
441    singleSourceSource.setGauge(gaugeName, value);
442    globalSourceSource.setGauge(gaugeName, value);
443  }
444
445  @Override
446  public void incGauge(String gaugeName, long delta) {
447    singleSourceSource.incGauge(gaugeName, delta);
448    globalSourceSource.incGauge(gaugeName, delta);
449  }
450
451  @Override
452  public void decGauge(String gaugeName, long delta) {
453    singleSourceSource.decGauge(gaugeName, delta);
454    globalSourceSource.decGauge(gaugeName, delta);
455  }
456
457  @Override
458  public void removeMetric(String key) {
459    singleSourceSource.removeMetric(key);
460    globalSourceSource.removeMetric(key);
461  }
462
463  @Override
464  public void incCounters(String counterName, long delta) {
465    singleSourceSource.incCounters(counterName, delta);
466    globalSourceSource.incCounters(counterName, delta);
467  }
468
469  @Override
470  public void updateHistogram(String name, long value) {
471    singleSourceSource.updateHistogram(name, value);
472    globalSourceSource.updateHistogram(name, value);
473  }
474
475  @Override
476  public String getMetricsContext() {
477    return globalSourceSource.getMetricsContext();
478  }
479
480  @Override
481  public String getMetricsDescription() {
482    return globalSourceSource.getMetricsDescription();
483  }
484
485  @Override
486  public String getMetricsJmxContext() {
487    return globalSourceSource.getMetricsJmxContext();
488  }
489
490  @Override
491  public String getMetricsName() {
492    return globalSourceSource.getMetricsName();
493  }
494
495  @InterfaceAudience.Private
496  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
497    return singleSourceSourceByTable;
498  }
499
500  /**
501   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
502   */
503  public void setWALReaderEditsBufferUsage(long usageInBytes) {
504    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
505  }
506
507  /**
508   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
509   * @return
510   */
511  public long getWALReaderEditsBufferUsage() {
512    return globalSourceSource.getWALReaderEditsBufferBytes();
513  }
514}