001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.hadoop.hbase.util.Pair;
026import org.apache.hadoop.hbase.wal.WAL.Entry;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.metrics.BaseSource;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034
035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
036
037/**
038 * This class is for maintaining the various replication statistics for a source and publishing them
039 * through the metrics interfaces.
040 */
041@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
042public class MetricsSource implements BaseSource {
043
044  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
045
046  // tracks last shipped timestamp for each wal group
047  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
048  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
049  private long lastHFileRefsQueueSize = 0;
050  private String id;
051  private long timeStampNextToReplicate;
052
053  private final MetricsReplicationSourceSource singleSourceSource;
054  private final MetricsReplicationGlobalSourceSource globalSourceSource;
055  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
056
057  /**
058   * Constructor used to register the metrics
059   *
060   * @param id Name of the source this class is monitoring
061   */
062  public MetricsSource(String id) {
063    this.id = id;
064    singleSourceSource =
065        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
066            .getSource(id);
067    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
068    singleSourceSourceByTable = new HashMap<>();
069  }
070
071  /**
072   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
073   * @param id Name of the source this class is monitoring
074   * @param singleSourceSource Class to monitor id-scoped metrics
075   * @param globalSourceSource Class to monitor global-scoped metrics
076   */
077  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
078                       MetricsReplicationGlobalSourceSource globalSourceSource,
079                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
080    this.id = id;
081    this.singleSourceSource = singleSourceSource;
082    this.globalSourceSource = globalSourceSource;
083    this.singleSourceSourceByTable = singleSourceSourceByTable;
084  }
085
086  /**
087   * Set the age of the last edit that was shipped
088   * @param timestamp target write time of the edit
089   * @param walGroup which group we are setting
090   */
091  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
092    long age = EnvironmentEdgeManager.currentTime() - timestamp;
093    singleSourceSource.setLastShippedAge(age);
094    globalSourceSource.setLastShippedAge(age);
095    this.ageOfLastShippedOp.put(walGroup, age);
096    this.lastShippedTimeStamps.put(walGroup, timestamp);
097  }
098
099  /**
100   * Update the table level replication metrics per table
101   *
102   * @param walEntries List of pairs of WAL entry and it's size
103   */
104  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
105    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
106      Entry entry = walEntryWithSize.getFirst();
107      long entrySize = walEntryWithSize.getSecond();
108      String tableName = entry.getKey().getTableName().getNameAsString();
109      long writeTime = entry.getKey().getWriteTime();
110      long age = EnvironmentEdgeManager.currentTime() - writeTime;
111
112      // get the replication metrics source for table at the run time
113      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
114        .computeIfAbsent(tableName,
115          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
116            .getTableSource(t));
117      tableSource.setLastShippedAge(age);
118      tableSource.incrShippedBytes(entrySize);
119    }
120  }
121
122  /**
123   * Set the age of the last edit that was shipped group by table
124   * @param timestamp write time of the edit
125   * @param tableName String as group and tableName
126   */
127  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
128    long age = EnvironmentEdgeManager.currentTime() - timestamp;
129    this.getSingleSourceSourceByTable().computeIfAbsent(
130        tableName, t -> CompatibilitySingletonFactory
131            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
132            .setLastShippedAge(age);
133  }
134
135  /**
136   * get age of last shipped op of given wal group. If the walGroup is null, return 0
137   * @param walGroup which group we are getting
138   * @return age
139   */
140  public long getAgeOfLastShippedOp(String walGroup) {
141    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
142  }
143
144  /**
145   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
146   * when replication fails and need to keep that metric accurate.
147   * @param walGroupId id of the group to update
148   */
149  public void refreshAgeOfLastShippedOp(String walGroupId) {
150    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
151    if (lastTimestamp == null) {
152      this.lastShippedTimeStamps.put(walGroupId, 0L);
153      lastTimestamp = 0L;
154    }
155    if (lastTimestamp > 0) {
156      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
157    }
158  }
159
160  /**
161   * Increment size of the log queue.
162   */
163  public void incrSizeOfLogQueue() {
164    singleSourceSource.incrSizeOfLogQueue(1);
165    globalSourceSource.incrSizeOfLogQueue(1);
166  }
167
168  public void decrSizeOfLogQueue() {
169    singleSourceSource.decrSizeOfLogQueue(1);
170    globalSourceSource.decrSizeOfLogQueue(1);
171  }
172
173  /**
174   * Add on the the number of log edits read
175   *
176   * @param delta the number of log edits read.
177   */
178  private void incrLogEditsRead(long delta) {
179    singleSourceSource.incrLogReadInEdits(delta);
180    globalSourceSource.incrLogReadInEdits(delta);
181  }
182
183  /** Increment the number of log edits read by one. */
184  public void incrLogEditsRead() {
185    incrLogEditsRead(1);
186  }
187
188  /**
189   * Add on the number of log edits filtered
190   *
191   * @param delta the number filtered.
192   */
193  public void incrLogEditsFiltered(long delta) {
194    singleSourceSource.incrLogEditsFiltered(delta);
195    globalSourceSource.incrLogEditsFiltered(delta);
196  }
197
198  /** The number of log edits filtered out. */
199  public void incrLogEditsFiltered() {
200    incrLogEditsFiltered(1);
201  }
202
203  /**
204   * Convience method to apply changes to metrics do to shipping a batch of logs.
205   *
206   * @param batchSize the size of the batch that was shipped to sinks.
207   */
208  public void shipBatch(long batchSize, int sizeInBytes) {
209    singleSourceSource.incrBatchesShipped(1);
210    globalSourceSource.incrBatchesShipped(1);
211
212    singleSourceSource.incrOpsShipped(batchSize);
213    globalSourceSource.incrOpsShipped(batchSize);
214
215    singleSourceSource.incrShippedBytes(sizeInBytes);
216    globalSourceSource.incrShippedBytes(sizeInBytes);
217  }
218
219  /**
220   * Gets the number of edits not eligible for replication this source queue logs so far.
221   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
222   */
223  public long getEditsFiltered(){
224    return this.singleSourceSource.getEditsFiltered();
225  }
226
227  /**
228   * Gets the number of edits eligible for replication read from this source queue logs so far.
229   * @return replicableEdits total number of replicable edits read from this queue logs.
230   */
231  public long getReplicableEdits(){
232    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
233  }
234
235  /**
236   * Gets the number of OPs shipped by this source queue to target cluster.
237   * @return oPsShipped total number of OPs shipped by this source.
238   */
239  public long getOpsShipped() {
240    return this.singleSourceSource.getShippedOps();
241  }
242
243  /**
244   * Convience method to apply changes to metrics do to shipping a batch of logs.
245   *
246   * @param batchSize the size of the batch that was shipped to sinks.
247   * @param hfiles total number of hfiles shipped to sinks.
248   */
249  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
250    shipBatch(batchSize, sizeInBytes);
251    singleSourceSource.incrHFilesShipped(hfiles);
252    globalSourceSource.incrHFilesShipped(hfiles);
253  }
254
255  /** increase the byte number read by source from log file */
256  public void incrLogReadInBytes(long readInBytes) {
257    singleSourceSource.incrLogReadInBytes(readInBytes);
258    globalSourceSource.incrLogReadInBytes(readInBytes);
259  }
260
261  /** Removes all metrics about this Source. */
262  public void clear() {
263    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
264    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
265    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
266    singleSourceSource.clear();
267    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
268    lastShippedTimeStamps.clear();
269    lastHFileRefsQueueSize = 0;
270    timeStampNextToReplicate = 0;
271  }
272
273  /**
274   * Get AgeOfLastShippedOp
275   * @return AgeOfLastShippedOp
276   */
277  public Long getAgeOfLastShippedOp() {
278    return singleSourceSource.getLastShippedAge();
279  }
280
281  /**
282   * Get the sizeOfLogQueue
283   * @return sizeOfLogQueue
284   */
285  public int getSizeOfLogQueue() {
286    return singleSourceSource.getSizeOfLogQueue();
287  }
288
289  /**
290   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
291   * @return lastTimestampForAge
292   */
293  public long getTimestampOfLastShippedOp() {
294    long lastTimestamp = 0L;
295    for (long ts : lastShippedTimeStamps.values()) {
296      if (ts > lastTimestamp) {
297        lastTimestamp = ts;
298      }
299    }
300    return lastTimestamp;
301  }
302
303  /**
304   * TimeStamp of next edit to be replicated.
305   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
306   */
307  public long getTimeStampNextToReplicate() {
308    return timeStampNextToReplicate;
309  }
310
311  /**
312   * TimeStamp of next edit targeted for replication. Used for calculating lag,
313   * as if this timestamp is greater than timestamp of last shipped, it means there's
314   * at least one edit pending replication.
315   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
316   */
317  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
318    this.timeStampNextToReplicate = timeStampNextToReplicate;
319  }
320
321  public long getReplicationDelay() {
322    if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
323      return 0;
324    }else{
325      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
326    }
327  }
328
329  /**
330   * Get the slave peer ID
331   * @return peerID
332   */
333  public String getPeerID() {
334    return id;
335  }
336
337  public void incrSizeOfHFileRefsQueue(long size) {
338    singleSourceSource.incrSizeOfHFileRefsQueue(size);
339    globalSourceSource.incrSizeOfHFileRefsQueue(size);
340    lastHFileRefsQueueSize = size;
341  }
342
343  public void decrSizeOfHFileRefsQueue(int size) {
344    singleSourceSource.decrSizeOfHFileRefsQueue(size);
345    globalSourceSource.decrSizeOfHFileRefsQueue(size);
346    lastHFileRefsQueueSize -= size;
347    if (lastHFileRefsQueueSize < 0) {
348      lastHFileRefsQueueSize = 0;
349    }
350  }
351
352  public void incrUnknownFileLengthForClosedWAL() {
353    singleSourceSource.incrUnknownFileLengthForClosedWAL();
354    globalSourceSource.incrUnknownFileLengthForClosedWAL();
355  }
356
357  public void incrUncleanlyClosedWALs() {
358    singleSourceSource.incrUncleanlyClosedWALs();
359    globalSourceSource.incrUncleanlyClosedWALs();
360  }
361
362  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
363    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
364    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
365  }
366
367  public void incrRestartedWALReading() {
368    singleSourceSource.incrRestartedWALReading();
369    globalSourceSource.incrRestartedWALReading();
370  }
371
372  public void incrRepeatedFileBytes(final long bytes) {
373    singleSourceSource.incrRepeatedFileBytes(bytes);
374    globalSourceSource.incrRepeatedFileBytes(bytes);
375  }
376
377  public void incrCompletedWAL() {
378    singleSourceSource.incrCompletedWAL();
379    globalSourceSource.incrCompletedWAL();
380  }
381
382  public void incrCompletedRecoveryQueue() {
383    singleSourceSource.incrCompletedRecoveryQueue();
384    globalSourceSource.incrCompletedRecoveryQueue();
385  }
386
387  public void incrFailedRecoveryQueue() {
388    globalSourceSource.incrFailedRecoveryQueue();
389  }
390
391  @Override
392  public void init() {
393    singleSourceSource.init();
394    globalSourceSource.init();
395  }
396
397  @Override
398  public void setGauge(String gaugeName, long value) {
399    singleSourceSource.setGauge(gaugeName, value);
400    globalSourceSource.setGauge(gaugeName, value);
401  }
402
403  @Override
404  public void incGauge(String gaugeName, long delta) {
405    singleSourceSource.incGauge(gaugeName, delta);
406    globalSourceSource.incGauge(gaugeName, delta);
407  }
408
409  @Override
410  public void decGauge(String gaugeName, long delta) {
411    singleSourceSource.decGauge(gaugeName, delta);
412    globalSourceSource.decGauge(gaugeName, delta);
413  }
414
415  @Override
416  public void removeMetric(String key) {
417    singleSourceSource.removeMetric(key);
418    globalSourceSource.removeMetric(key);
419  }
420
421  @Override
422  public void incCounters(String counterName, long delta) {
423    singleSourceSource.incCounters(counterName, delta);
424    globalSourceSource.incCounters(counterName, delta);
425  }
426
427  @Override
428  public void updateHistogram(String name, long value) {
429    singleSourceSource.updateHistogram(name, value);
430    globalSourceSource.updateHistogram(name, value);
431  }
432
433  @Override
434  public String getMetricsContext() {
435    return globalSourceSource.getMetricsContext();
436  }
437
438  @Override
439  public String getMetricsDescription() {
440    return globalSourceSource.getMetricsDescription();
441  }
442
443  @Override
444  public String getMetricsJmxContext() {
445    return globalSourceSource.getMetricsJmxContext();
446  }
447
448  @Override
449  public String getMetricsName() {
450    return globalSourceSource.getMetricsName();
451  }
452
453  @VisibleForTesting
454  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
455    return singleSourceSourceByTable;
456  }
457
458  /**
459   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
460   */
461  public void setWALReaderEditsBufferUsage(long usageInBytes) {
462    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
463  }
464
465  /**
466   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
467   * @return
468   */
469  public long getWALReaderEditsBufferUsage() {
470    return globalSourceSource.getWALReaderEditsBufferBytes();
471  }
472}