001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.hadoop.hbase.util.Pair;
026import org.apache.hadoop.hbase.wal.WAL.Entry;
027import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.metrics.BaseSource;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * This class is for maintaining the various replication statistics for a source and publishing them
037 * through the metrics interfaces.
038 */
039@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
040public class MetricsSource implements BaseSource {
041
042  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
043
044  // tracks last shipped timestamp for each wal group
045  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
046  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
047  private long lastHFileRefsQueueSize = 0;
048  private String id;
049  private long timeStampNextToReplicate;
050
051  private final MetricsReplicationSourceSource singleSourceSource;
052  private final MetricsReplicationGlobalSourceSource globalSourceSource;
053  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
054
055  /**
056   * Constructor used to register the metrics
057   *
058   * @param id Name of the source this class is monitoring
059   */
060  public MetricsSource(String id) {
061    this.id = id;
062    singleSourceSource =
063        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
064            .getSource(id);
065    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
066    singleSourceSourceByTable = new HashMap<>();
067  }
068
069  /**
070   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
071   * @param id Name of the source this class is monitoring
072   * @param singleSourceSource Class to monitor id-scoped metrics
073   * @param globalSourceSource Class to monitor global-scoped metrics
074   */
075  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
076                       MetricsReplicationGlobalSourceSource globalSourceSource,
077                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
078    this.id = id;
079    this.singleSourceSource = singleSourceSource;
080    this.globalSourceSource = globalSourceSource;
081    this.singleSourceSourceByTable = singleSourceSourceByTable;
082  }
083
084  /**
085   * Set the age of the last edit that was shipped
086   * @param timestamp target write time of the edit
087   * @param walGroup which group we are setting
088   */
089  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
090    long age = EnvironmentEdgeManager.currentTime() - timestamp;
091    singleSourceSource.setLastShippedAge(age);
092    globalSourceSource.setLastShippedAge(age);
093    this.ageOfLastShippedOp.put(walGroup, age);
094    this.lastShippedTimeStamps.put(walGroup, timestamp);
095  }
096
097  /**
098   * Update the table level replication metrics per table
099   *
100   * @param walEntries List of pairs of WAL entry and it's size
101   */
102  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
103    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
104      Entry entry = walEntryWithSize.getFirst();
105      long entrySize = walEntryWithSize.getSecond();
106      String tableName = entry.getKey().getTableName().getNameAsString();
107      long writeTime = entry.getKey().getWriteTime();
108      long age = EnvironmentEdgeManager.currentTime() - writeTime;
109
110      // get the replication metrics source for table at the run time
111      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
112        .computeIfAbsent(tableName,
113          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
114            .getTableSource(t));
115      tableSource.setLastShippedAge(age);
116      tableSource.incrShippedBytes(entrySize);
117    }
118  }
119
120  /**
121   * Set the age of the last edit that was shipped group by table
122   * @param timestamp write time of the edit
123   * @param tableName String as group and tableName
124   */
125  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
126    long age = EnvironmentEdgeManager.currentTime() - timestamp;
127    this.getSingleSourceSourceByTable().computeIfAbsent(
128        tableName, t -> CompatibilitySingletonFactory
129            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
130            .setLastShippedAge(age);
131  }
132
133  /**
134   * get age of last shipped op of given wal group. If the walGroup is null, return 0
135   * @param walGroup which group we are getting
136   * @return age
137   */
138  public long getAgeOfLastShippedOp(String walGroup) {
139    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
140  }
141
142  /**
143   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
144   * when replication fails and need to keep that metric accurate.
145   * @param walGroupId id of the group to update
146   */
147  public void refreshAgeOfLastShippedOp(String walGroupId) {
148    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
149    if (lastTimestamp == null) {
150      this.lastShippedTimeStamps.put(walGroupId, 0L);
151      lastTimestamp = 0L;
152    }
153    if (lastTimestamp > 0) {
154      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
155    }
156  }
157
158  /**
159   * Increment size of the log queue.
160   */
161  public void incrSizeOfLogQueue() {
162    singleSourceSource.incrSizeOfLogQueue(1);
163    globalSourceSource.incrSizeOfLogQueue(1);
164  }
165
166  public void decrSizeOfLogQueue() {
167    singleSourceSource.decrSizeOfLogQueue(1);
168    globalSourceSource.decrSizeOfLogQueue(1);
169  }
170
171  /**
172   * Add on the the number of log edits read
173   *
174   * @param delta the number of log edits read.
175   */
176  private void incrLogEditsRead(long delta) {
177    singleSourceSource.incrLogReadInEdits(delta);
178    globalSourceSource.incrLogReadInEdits(delta);
179  }
180
181  /** Increment the number of log edits read by one. */
182  public void incrLogEditsRead() {
183    incrLogEditsRead(1);
184  }
185
186  /**
187   * Add on the number of log edits filtered
188   *
189   * @param delta the number filtered.
190   */
191  public void incrLogEditsFiltered(long delta) {
192    singleSourceSource.incrLogEditsFiltered(delta);
193    globalSourceSource.incrLogEditsFiltered(delta);
194  }
195
196  /** The number of log edits filtered out. */
197  public void incrLogEditsFiltered() {
198    incrLogEditsFiltered(1);
199  }
200
201  /**
202   * Convience method to apply changes to metrics do to shipping a batch of logs.
203   *
204   * @param batchSize the size of the batch that was shipped to sinks.
205   */
206  public void shipBatch(long batchSize, int sizeInBytes) {
207    singleSourceSource.incrBatchesShipped(1);
208    globalSourceSource.incrBatchesShipped(1);
209
210    singleSourceSource.incrOpsShipped(batchSize);
211    globalSourceSource.incrOpsShipped(batchSize);
212
213    singleSourceSource.incrShippedBytes(sizeInBytes);
214    globalSourceSource.incrShippedBytes(sizeInBytes);
215  }
216
217  /**
218   * Gets the number of edits not eligible for replication this source queue logs so far.
219   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
220   */
221  public long getEditsFiltered(){
222    return this.singleSourceSource.getEditsFiltered();
223  }
224
225  /**
226   * Gets the number of edits eligible for replication read from this source queue logs so far.
227   * @return replicableEdits total number of replicable edits read from this queue logs.
228   */
229  public long getReplicableEdits(){
230    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
231  }
232
233  /**
234   * Gets the number of OPs shipped by this source queue to target cluster.
235   * @return oPsShipped total number of OPs shipped by this source.
236   */
237  public long getOpsShipped() {
238    return this.singleSourceSource.getShippedOps();
239  }
240
241  /**
242   * Convience method to apply changes to metrics do to shipping a batch of logs.
243   *
244   * @param batchSize the size of the batch that was shipped to sinks.
245   * @param hfiles total number of hfiles shipped to sinks.
246   */
247  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
248    shipBatch(batchSize, sizeInBytes);
249    singleSourceSource.incrHFilesShipped(hfiles);
250    globalSourceSource.incrHFilesShipped(hfiles);
251  }
252
253  /** increase the byte number read by source from log file */
254  public void incrLogReadInBytes(long readInBytes) {
255    singleSourceSource.incrLogReadInBytes(readInBytes);
256    globalSourceSource.incrLogReadInBytes(readInBytes);
257  }
258
259  /** Removes all metrics about this Source. */
260  public void clear() {
261    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
262    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
263    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
264    singleSourceSource.clear();
265    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
266    lastShippedTimeStamps.clear();
267    lastHFileRefsQueueSize = 0;
268    timeStampNextToReplicate = 0;
269  }
270
271  /**
272   * Get AgeOfLastShippedOp
273   * @return AgeOfLastShippedOp
274   */
275  public Long getAgeOfLastShippedOp() {
276    return singleSourceSource.getLastShippedAge();
277  }
278
279  /**
280   * Get the sizeOfLogQueue
281   * @return sizeOfLogQueue
282   */
283  public int getSizeOfLogQueue() {
284    return singleSourceSource.getSizeOfLogQueue();
285  }
286
287  /**
288   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
289   * @return lastTimestampForAge
290   * @deprecated Since 2.0.0. Removed in 3.0.0.
291   * @see #getTimestampOfLastShippedOp()
292   */
293  @Deprecated
294  public long getTimeStampOfLastShippedOp() {
295    return getTimestampOfLastShippedOp();
296  }
297
298  /**
299   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
300   * @return lastTimestampForAge
301   */
302  public long getTimestampOfLastShippedOp() {
303    long lastTimestamp = 0L;
304    for (long ts : lastShippedTimeStamps.values()) {
305      if (ts > lastTimestamp) {
306        lastTimestamp = ts;
307      }
308    }
309    return lastTimestamp;
310  }
311
312  /**
313   * TimeStamp of next edit to be replicated.
314   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
315   */
316  public long getTimeStampNextToReplicate() {
317    return timeStampNextToReplicate;
318  }
319
320  /**
321   * TimeStamp of next edit targeted for replication. Used for calculating lag,
322   * as if this timestamp is greater than timestamp of last shipped, it means there's
323   * at least one edit pending replication.
324   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
325   */
326  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
327    this.timeStampNextToReplicate = timeStampNextToReplicate;
328  }
329
330  public long getReplicationDelay() {
331    if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
332      return 0;
333    }else{
334      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
335    }
336  }
337
338  /**
339   * Get the slave peer ID
340   * @return peerID
341   */
342  public String getPeerID() {
343    return id;
344  }
345
346  public void incrSizeOfHFileRefsQueue(long size) {
347    singleSourceSource.incrSizeOfHFileRefsQueue(size);
348    globalSourceSource.incrSizeOfHFileRefsQueue(size);
349    lastHFileRefsQueueSize = size;
350  }
351
352  public void decrSizeOfHFileRefsQueue(int size) {
353    singleSourceSource.decrSizeOfHFileRefsQueue(size);
354    globalSourceSource.decrSizeOfHFileRefsQueue(size);
355    lastHFileRefsQueueSize -= size;
356    if (lastHFileRefsQueueSize < 0) {
357      lastHFileRefsQueueSize = 0;
358    }
359  }
360
361  public void incrUnknownFileLengthForClosedWAL() {
362    singleSourceSource.incrUnknownFileLengthForClosedWAL();
363    globalSourceSource.incrUnknownFileLengthForClosedWAL();
364  }
365
366  public void incrUncleanlyClosedWALs() {
367    singleSourceSource.incrUncleanlyClosedWALs();
368    globalSourceSource.incrUncleanlyClosedWALs();
369  }
370
371  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
372    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
373    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
374  }
375
376  public void incrRestartedWALReading() {
377    singleSourceSource.incrRestartedWALReading();
378    globalSourceSource.incrRestartedWALReading();
379  }
380
381  public void incrRepeatedFileBytes(final long bytes) {
382    singleSourceSource.incrRepeatedFileBytes(bytes);
383    globalSourceSource.incrRepeatedFileBytes(bytes);
384  }
385
386  public void incrCompletedWAL() {
387    singleSourceSource.incrCompletedWAL();
388    globalSourceSource.incrCompletedWAL();
389  }
390
391  public void incrCompletedRecoveryQueue() {
392    singleSourceSource.incrCompletedRecoveryQueue();
393    globalSourceSource.incrCompletedRecoveryQueue();
394  }
395
396  public void incrFailedRecoveryQueue() {
397    globalSourceSource.incrFailedRecoveryQueue();
398  }
399
400  @Override
401  public void init() {
402    singleSourceSource.init();
403    globalSourceSource.init();
404  }
405
406  @Override
407  public void setGauge(String gaugeName, long value) {
408    singleSourceSource.setGauge(gaugeName, value);
409    globalSourceSource.setGauge(gaugeName, value);
410  }
411
412  @Override
413  public void incGauge(String gaugeName, long delta) {
414    singleSourceSource.incGauge(gaugeName, delta);
415    globalSourceSource.incGauge(gaugeName, delta);
416  }
417
418  @Override
419  public void decGauge(String gaugeName, long delta) {
420    singleSourceSource.decGauge(gaugeName, delta);
421    globalSourceSource.decGauge(gaugeName, delta);
422  }
423
424  @Override
425  public void removeMetric(String key) {
426    singleSourceSource.removeMetric(key);
427    globalSourceSource.removeMetric(key);
428  }
429
430  @Override
431  public void incCounters(String counterName, long delta) {
432    singleSourceSource.incCounters(counterName, delta);
433    globalSourceSource.incCounters(counterName, delta);
434  }
435
436  @Override
437  public void updateHistogram(String name, long value) {
438    singleSourceSource.updateHistogram(name, value);
439    globalSourceSource.updateHistogram(name, value);
440  }
441
442  @Override
443  public String getMetricsContext() {
444    return globalSourceSource.getMetricsContext();
445  }
446
447  @Override
448  public String getMetricsDescription() {
449    return globalSourceSource.getMetricsDescription();
450  }
451
452  @Override
453  public String getMetricsJmxContext() {
454    return globalSourceSource.getMetricsJmxContext();
455  }
456
457  @Override
458  public String getMetricsName() {
459    return globalSourceSource.getMetricsName();
460  }
461
462  @InterfaceAudience.Private
463  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
464    return singleSourceSourceByTable;
465  }
466
467  /**
468   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
469   */
470  public void setWALReaderEditsBufferUsage(long usageInBytes) {
471    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
472  }
473
474  /**
475   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
476   * @return
477   */
478  public long getWALReaderEditsBufferUsage() {
479    return globalSourceSource.getWALReaderEditsBufferBytes();
480  }
481}