001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.metrics.BaseSource;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034/**
035 * This class is for maintaining the various replication statistics for a source and publishing them
036 * through the metrics interfaces.
037 */
038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
039public class MetricsSource implements BaseSource {
040
041  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
042
043  // tracks last shipped timestamp for each wal group
044  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
045  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
046  private long lastHFileRefsQueueSize = 0;
047  private String id;
048  private long timeStampNextToReplicate;
049
050  private final MetricsReplicationSourceSource singleSourceSource;
051  private final MetricsReplicationSourceSource globalSourceSource;
052  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
053
054  /**
055   * Constructor used to register the metrics
056   *
057   * @param id Name of the source this class is monitoring
058   */
059  public MetricsSource(String id) {
060    this.id = id;
061    singleSourceSource =
062        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
063            .getSource(id);
064    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
065    singleSourceSourceByTable = new HashMap<>();
066  }
067
068  /**
069   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
070   * @param id Name of the source this class is monitoring
071   * @param singleSourceSource Class to monitor id-scoped metrics
072   * @param globalSourceSource Class to monitor global-scoped metrics
073   */
074  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
075                       MetricsReplicationSourceSource globalSourceSource,
076                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
077    this.id = id;
078    this.singleSourceSource = singleSourceSource;
079    this.globalSourceSource = globalSourceSource;
080    this.singleSourceSourceByTable = singleSourceSourceByTable;
081  }
082
083  /**
084   * Set the age of the last edit that was shipped
085   * @param timestamp target write time of the edit
086   * @param walGroup which group we are setting
087   */
088  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
089    long age = EnvironmentEdgeManager.currentTime() - timestamp;
090    singleSourceSource.setLastShippedAge(age);
091    globalSourceSource.setLastShippedAge(age);
092    this.ageOfLastShippedOp.put(walGroup, age);
093    this.lastShippedTimeStamps.put(walGroup, timestamp);
094  }
095
096  /**
097   * Set the age of the last edit that was shipped group by table
098   * @param timestamp write time of the edit
099   * @param tableName String as group and tableName
100   */
101  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
102    long age = EnvironmentEdgeManager.currentTime() - timestamp;
103    this.getSingleSourceSourceByTable().computeIfAbsent(
104        tableName, t -> CompatibilitySingletonFactory
105            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
106            .setLastShippedAge(age);
107  }
108
109  /**
110   * get age of last shipped op of given wal group. If the walGroup is null, return 0
111   * @param walGroup which group we are getting
112   * @return age
113   */
114  public long getAgeofLastShippedOp(String walGroup) {
115    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
116  }
117
118  /**
119   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
120   * when replication fails and need to keep that metric accurate.
121   * @param walGroupId id of the group to update
122   */
123  public void refreshAgeOfLastShippedOp(String walGroupId) {
124    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
125    if (lastTimestamp == null) {
126      this.lastShippedTimeStamps.put(walGroupId, 0L);
127      lastTimestamp = 0L;
128    }
129    if (lastTimestamp > 0) {
130      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
131    }
132  }
133
134  /**
135   * Increment size of the log queue.
136   */
137  public void incrSizeOfLogQueue() {
138    singleSourceSource.incrSizeOfLogQueue(1);
139    globalSourceSource.incrSizeOfLogQueue(1);
140  }
141
142  public void decrSizeOfLogQueue() {
143    singleSourceSource.decrSizeOfLogQueue(1);
144    globalSourceSource.decrSizeOfLogQueue(1);
145  }
146
147  /**
148   * Add on the the number of log edits read
149   *
150   * @param delta the number of log edits read.
151   */
152  private void incrLogEditsRead(long delta) {
153    singleSourceSource.incrLogReadInEdits(delta);
154    globalSourceSource.incrLogReadInEdits(delta);
155  }
156
157  /** Increment the number of log edits read by one. */
158  public void incrLogEditsRead() {
159    incrLogEditsRead(1);
160  }
161
162  /**
163   * Add on the number of log edits filtered
164   *
165   * @param delta the number filtered.
166   */
167  public void incrLogEditsFiltered(long delta) {
168    singleSourceSource.incrLogEditsFiltered(delta);
169    globalSourceSource.incrLogEditsFiltered(delta);
170  }
171
172  /** The number of log edits filtered out. */
173  public void incrLogEditsFiltered() {
174    incrLogEditsFiltered(1);
175  }
176
177  /**
178   * Convience method to apply changes to metrics do to shipping a batch of logs.
179   *
180   * @param batchSize the size of the batch that was shipped to sinks.
181   */
182  public void shipBatch(long batchSize, int sizeInBytes) {
183    singleSourceSource.incrBatchesShipped(1);
184    globalSourceSource.incrBatchesShipped(1);
185
186    singleSourceSource.incrOpsShipped(batchSize);
187    globalSourceSource.incrOpsShipped(batchSize);
188
189    singleSourceSource.incrShippedBytes(sizeInBytes);
190    globalSourceSource.incrShippedBytes(sizeInBytes);
191  }
192
193  /**
194   * Gets the number of edits not eligible for replication this source queue logs so far.
195   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
196   */
197  public long getEditsFiltered(){
198    return this.singleSourceSource.getEditsFiltered();
199  }
200
201  /**
202   * Gets the number of edits eligible for replication read from this source queue logs so far.
203   * @return replicableEdits total number of replicable edits read from this queue logs.
204   */
205  public long getReplicableEdits(){
206    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
207  }
208
209  /**
210   * Gets the number of OPs shipped by this source queue to target cluster.
211   * @return oPsShipped total number of OPs shipped by this source.
212   */
213  public long getOpsShipped() {
214    return this.singleSourceSource.getShippedOps();
215  }
216
217  /**
218   * Convience method to apply changes to metrics do to shipping a batch of logs.
219   *
220   * @param batchSize the size of the batch that was shipped to sinks.
221   * @param hfiles total number of hfiles shipped to sinks.
222   */
223  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
224    shipBatch(batchSize, sizeInBytes);
225    singleSourceSource.incrHFilesShipped(hfiles);
226    globalSourceSource.incrHFilesShipped(hfiles);
227  }
228
229  /** increase the byte number read by source from log file */
230  public void incrLogReadInBytes(long readInBytes) {
231    singleSourceSource.incrLogReadInBytes(readInBytes);
232    globalSourceSource.incrLogReadInBytes(readInBytes);
233  }
234
235  /** Removes all metrics about this Source. */
236  public void clear() {
237    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
238    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
239    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
240    singleSourceSource.clear();
241    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
242    lastShippedTimeStamps.clear();
243    lastHFileRefsQueueSize = 0;
244    timeStampNextToReplicate = 0;
245  }
246
247  /**
248   * Get AgeOfLastShippedOp
249   * @return AgeOfLastShippedOp
250   */
251  public Long getAgeOfLastShippedOp() {
252    return singleSourceSource.getLastShippedAge();
253  }
254
255  /**
256   * Get the sizeOfLogQueue
257   * @return sizeOfLogQueue
258   */
259  public int getSizeOfLogQueue() {
260    return singleSourceSource.getSizeOfLogQueue();
261  }
262
263  /**
264   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
265   * @return lastTimestampForAge
266   * @deprecated Since 2.0.0. Removed in 3.0.0.
267   * @see #getTimestampOfLastShippedOp()
268   */
269  @Deprecated
270  public long getTimeStampOfLastShippedOp() {
271    return getTimestampOfLastShippedOp();
272  }
273
274  /**
275   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
276   * @return lastTimestampForAge
277   */
278  public long getTimestampOfLastShippedOp() {
279    long lastTimestamp = 0L;
280    for (long ts : lastShippedTimeStamps.values()) {
281      if (ts > lastTimestamp) {
282        lastTimestamp = ts;
283      }
284    }
285    return lastTimestamp;
286  }
287
288  /**
289   * TimeStamp of next edit to be replicated.
290   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
291   */
292  public long getTimeStampNextToReplicate() {
293    return timeStampNextToReplicate;
294  }
295
296  /**
297   * TimeStamp of next edit targeted for replication. Used for calculating lag,
298   * as if this timestamp is greater than timestamp of last shipped, it means there's
299   * at least one edit pending replication.
300   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
301   */
302  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
303    this.timeStampNextToReplicate = timeStampNextToReplicate;
304  }
305
306  public long getReplicationDelay() {
307    if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
308      return 0;
309    }else{
310      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
311    }
312  }
313
314  /**
315   * Get the slave peer ID
316   * @return peerID
317   */
318  public String getPeerID() {
319    return id;
320  }
321
322  public void incrSizeOfHFileRefsQueue(long size) {
323    singleSourceSource.incrSizeOfHFileRefsQueue(size);
324    globalSourceSource.incrSizeOfHFileRefsQueue(size);
325    lastHFileRefsQueueSize = size;
326  }
327
328  public void decrSizeOfHFileRefsQueue(int size) {
329    singleSourceSource.decrSizeOfHFileRefsQueue(size);
330    globalSourceSource.decrSizeOfHFileRefsQueue(size);
331    lastHFileRefsQueueSize -= size;
332    if (lastHFileRefsQueueSize < 0) {
333      lastHFileRefsQueueSize = 0;
334    }
335  }
336
337  public void incrUnknownFileLengthForClosedWAL() {
338    singleSourceSource.incrUnknownFileLengthForClosedWAL();
339    globalSourceSource.incrUnknownFileLengthForClosedWAL();
340  }
341
342  public void incrUncleanlyClosedWALs() {
343    singleSourceSource.incrUncleanlyClosedWALs();
344    globalSourceSource.incrUncleanlyClosedWALs();
345  }
346
347  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
348    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
349    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
350  }
351
352  public void incrRestartedWALReading() {
353    singleSourceSource.incrRestartedWALReading();
354    globalSourceSource.incrRestartedWALReading();
355  }
356
357  public void incrRepeatedFileBytes(final long bytes) {
358    singleSourceSource.incrRepeatedFileBytes(bytes);
359    globalSourceSource.incrRepeatedFileBytes(bytes);
360  }
361
362  public void incrCompletedWAL() {
363    singleSourceSource.incrCompletedWAL();
364    globalSourceSource.incrCompletedWAL();
365  }
366
367  public void incrCompletedRecoveryQueue() {
368    singleSourceSource.incrCompletedRecoveryQueue();
369    globalSourceSource.incrCompletedRecoveryQueue();
370  }
371
372  public void incrFailedRecoveryQueue() {
373    globalSourceSource.incrFailedRecoveryQueue();
374  }
375
376  @Override
377  public void init() {
378    singleSourceSource.init();
379    globalSourceSource.init();
380  }
381
382  @Override
383  public void setGauge(String gaugeName, long value) {
384    singleSourceSource.setGauge(gaugeName, value);
385    globalSourceSource.setGauge(gaugeName, value);
386  }
387
388  @Override
389  public void incGauge(String gaugeName, long delta) {
390    singleSourceSource.incGauge(gaugeName, delta);
391    globalSourceSource.incGauge(gaugeName, delta);
392  }
393
394  @Override
395  public void decGauge(String gaugeName, long delta) {
396    singleSourceSource.decGauge(gaugeName, delta);
397    globalSourceSource.decGauge(gaugeName, delta);
398  }
399
400  @Override
401  public void removeMetric(String key) {
402    singleSourceSource.removeMetric(key);
403    globalSourceSource.removeMetric(key);
404  }
405
406  @Override
407  public void incCounters(String counterName, long delta) {
408    singleSourceSource.incCounters(counterName, delta);
409    globalSourceSource.incCounters(counterName, delta);
410  }
411
412  @Override
413  public void updateHistogram(String name, long value) {
414    singleSourceSource.updateHistogram(name, value);
415    globalSourceSource.updateHistogram(name, value);
416  }
417
418  @Override
419  public String getMetricsContext() {
420    return globalSourceSource.getMetricsContext();
421  }
422
423  @Override
424  public String getMetricsDescription() {
425    return globalSourceSource.getMetricsDescription();
426  }
427
428  @Override
429  public String getMetricsJmxContext() {
430    return globalSourceSource.getMetricsJmxContext();
431  }
432
433  @Override
434  public String getMetricsName() {
435    return globalSourceSource.getMetricsName();
436  }
437
438  @VisibleForTesting
439  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
440    return singleSourceSourceByTable;
441  }
442}