001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.metrics.BaseSource;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031
032/**
033 * This class is for maintaining the various replication statistics for a source and publishing them
034 * through the metrics interfaces.
035 */
036@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
037public class MetricsSource implements BaseSource {
038
039  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
040
041  // tracks last shipped timestamp for each wal group
042  private Map<String, Long> lastTimestamps = new HashMap<>();
043  private long lastHFileRefsQueueSize = 0;
044  private String id;
045
046  private final MetricsReplicationSourceSource singleSourceSource;
047  private final MetricsReplicationSourceSource globalSourceSource;
048
049
050  /**
051   * Constructor used to register the metrics
052   *
053   * @param id Name of the source this class is monitoring
054   */
055  public MetricsSource(String id) {
056    this.id = id;
057    singleSourceSource =
058        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
059            .getSource(id);
060    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
061  }
062
063  /**
064   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
065   * @param id Name of the source this class is monitoring
066   * @param singleSourceSource Class to monitor id-scoped metrics
067   * @param globalSourceSource Class to monitor global-scoped metrics
068   */
069  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
070                       MetricsReplicationSourceSource globalSourceSource) {
071    this.id = id;
072    this.singleSourceSource = singleSourceSource;
073    this.globalSourceSource = globalSourceSource;
074  }
075
076  /**
077   * Set the age of the last edit that was shipped
078   * @param timestamp write time of the edit
079   * @param walGroup which group we are setting
080   */
081  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
082    long age = EnvironmentEdgeManager.currentTime() - timestamp;
083    singleSourceSource.setLastShippedAge(age);
084    globalSourceSource.setLastShippedAge(age);
085    this.lastTimestamps.put(walGroup, timestamp);
086  }
087
088  /**
089   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
090   * when replication fails and need to keep that metric accurate.
091   * @param walGroupId id of the group to update
092   */
093  public void refreshAgeOfLastShippedOp(String walGroupId) {
094    Long lastTimestamp = this.lastTimestamps.get(walGroupId);
095    if (lastTimestamp == null) {
096      this.lastTimestamps.put(walGroupId, 0L);
097      lastTimestamp = 0L;
098    }
099    if (lastTimestamp > 0) {
100      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
101    }
102  }
103
104  /**
105   * Increment size of the log queue.
106   */
107  public void incrSizeOfLogQueue() {
108    singleSourceSource.incrSizeOfLogQueue(1);
109    globalSourceSource.incrSizeOfLogQueue(1);
110  }
111
112  public void decrSizeOfLogQueue() {
113    singleSourceSource.decrSizeOfLogQueue(1);
114    globalSourceSource.decrSizeOfLogQueue(1);
115  }
116
117  /**
118   * Add on the the number of log edits read
119   *
120   * @param delta the number of log edits read.
121   */
122  private void incrLogEditsRead(long delta) {
123    singleSourceSource.incrLogReadInEdits(delta);
124    globalSourceSource.incrLogReadInEdits(delta);
125  }
126
127  /** Increment the number of log edits read by one. */
128  public void incrLogEditsRead() {
129    incrLogEditsRead(1);
130  }
131
132  /**
133   * Add on the number of log edits filtered
134   *
135   * @param delta the number filtered.
136   */
137  public void incrLogEditsFiltered(long delta) {
138    singleSourceSource.incrLogEditsFiltered(delta);
139    globalSourceSource.incrLogEditsFiltered(delta);
140  }
141
142  /** The number of log edits filtered out. */
143  public void incrLogEditsFiltered() {
144    incrLogEditsFiltered(1);
145  }
146
147  /**
148   * Convience method to apply changes to metrics do to shipping a batch of logs.
149   *
150   * @param batchSize the size of the batch that was shipped to sinks.
151   */
152  public void shipBatch(long batchSize, int sizeInBytes) {
153    singleSourceSource.incrBatchesShipped(1);
154    globalSourceSource.incrBatchesShipped(1);
155
156    singleSourceSource.incrOpsShipped(batchSize);
157    globalSourceSource.incrOpsShipped(batchSize);
158
159    singleSourceSource.incrShippedBytes(sizeInBytes);
160    globalSourceSource.incrShippedBytes(sizeInBytes);
161  }
162
163  /**
164   * Convience method to apply changes to metrics do to shipping a batch of logs.
165   *
166   * @param batchSize the size of the batch that was shipped to sinks.
167   * @param hfiles total number of hfiles shipped to sinks.
168   */
169  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
170    shipBatch(batchSize, sizeInBytes);
171    singleSourceSource.incrHFilesShipped(hfiles);
172    globalSourceSource.incrHFilesShipped(hfiles);
173  }
174
175  /** increase the byte number read by source from log file */
176  public void incrLogReadInBytes(long readInBytes) {
177    singleSourceSource.incrLogReadInBytes(readInBytes);
178    globalSourceSource.incrLogReadInBytes(readInBytes);
179  }
180
181  /** Removes all metrics about this Source. */
182  public void clear() {
183    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
184    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
185    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
186    singleSourceSource.clear();
187    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
188    lastTimestamps.clear();
189    lastHFileRefsQueueSize = 0;
190  }
191
192  /**
193   * Get AgeOfLastShippedOp
194   * @return AgeOfLastShippedOp
195   */
196  public Long getAgeOfLastShippedOp() {
197    return singleSourceSource.getLastShippedAge();
198  }
199
200  /**
201   * Get the sizeOfLogQueue
202   * @return sizeOfLogQueue
203   */
204  public int getSizeOfLogQueue() {
205    return singleSourceSource.getSizeOfLogQueue();
206  }
207
208  /**
209   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
210   * @return lastTimestampForAge
211   * @deprecated Since 2.0.0. Removed in 3.0.0.
212   * @see #getTimestampOfLastShippedOp()
213   */
214  @Deprecated
215  public long getTimeStampOfLastShippedOp() {
216    return getTimestampOfLastShippedOp();
217  }
218
219  /**
220   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
221   * @return lastTimestampForAge
222   */
223  public long getTimestampOfLastShippedOp() {
224    long lastTimestamp = 0L;
225    for (long ts : lastTimestamps.values()) {
226      if (ts > lastTimestamp) {
227        lastTimestamp = ts;
228      }
229    }
230    return lastTimestamp;
231  }
232
233  /**
234   * Get the slave peer ID
235   * @return peerID
236   */
237  public String getPeerID() {
238    return id;
239  }
240
241  public void incrSizeOfHFileRefsQueue(long size) {
242    singleSourceSource.incrSizeOfHFileRefsQueue(size);
243    globalSourceSource.incrSizeOfHFileRefsQueue(size);
244    lastHFileRefsQueueSize = size;
245  }
246
247  public void decrSizeOfHFileRefsQueue(int size) {
248    singleSourceSource.decrSizeOfHFileRefsQueue(size);
249    globalSourceSource.decrSizeOfHFileRefsQueue(size);
250    lastHFileRefsQueueSize -= size;
251    if (lastHFileRefsQueueSize < 0) {
252      lastHFileRefsQueueSize = 0;
253    }
254  }
255
256  public void incrUnknownFileLengthForClosedWAL() {
257    singleSourceSource.incrUnknownFileLengthForClosedWAL();
258    globalSourceSource.incrUnknownFileLengthForClosedWAL();
259  }
260
261  public void incrUncleanlyClosedWALs() {
262    singleSourceSource.incrUncleanlyClosedWALs();
263    globalSourceSource.incrUncleanlyClosedWALs();
264  }
265
266  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
267    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
268    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
269  }
270
271  public void incrRestartedWALReading() {
272    singleSourceSource.incrRestartedWALReading();
273    globalSourceSource.incrRestartedWALReading();
274  }
275
276  public void incrRepeatedFileBytes(final long bytes) {
277    singleSourceSource.incrRepeatedFileBytes(bytes);
278    globalSourceSource.incrRepeatedFileBytes(bytes);
279  }
280
281  public void incrCompletedWAL() {
282    singleSourceSource.incrCompletedWAL();
283    globalSourceSource.incrCompletedWAL();
284  }
285
286  public void incrCompletedRecoveryQueue() {
287    singleSourceSource.incrCompletedRecoveryQueue();
288    globalSourceSource.incrCompletedRecoveryQueue();
289  }
290
291  @Override
292  public void init() {
293    singleSourceSource.init();
294    globalSourceSource.init();
295  }
296
297  @Override
298  public void setGauge(String gaugeName, long value) {
299    singleSourceSource.setGauge(gaugeName, value);
300    globalSourceSource.setGauge(gaugeName, value);
301  }
302
303  @Override
304  public void incGauge(String gaugeName, long delta) {
305    singleSourceSource.incGauge(gaugeName, delta);
306    globalSourceSource.incGauge(gaugeName, delta);
307  }
308
309  @Override
310  public void decGauge(String gaugeName, long delta) {
311    singleSourceSource.decGauge(gaugeName, delta);
312    globalSourceSource.decGauge(gaugeName, delta);
313  }
314
315  @Override
316  public void removeMetric(String key) {
317    singleSourceSource.removeMetric(key);
318    globalSourceSource.removeMetric(key);
319  }
320
321  @Override
322  public void incCounters(String counterName, long delta) {
323    singleSourceSource.incCounters(counterName, delta);
324    globalSourceSource.incCounters(counterName, delta);
325  }
326
327  @Override
328  public void updateHistogram(String name, long value) {
329    singleSourceSource.updateHistogram(name, value);
330    globalSourceSource.updateHistogram(name, value);
331  }
332
333  @Override
334  public String getMetricsContext() {
335    return globalSourceSource.getMetricsContext();
336  }
337
338  @Override
339  public String getMetricsDescription() {
340    return globalSourceSource.getMetricsDescription();
341  }
342
343  @Override
344  public String getMetricsJmxContext() {
345    return globalSourceSource.getMetricsJmxContext();
346  }
347
348  @Override
349  public String getMetricsName() {
350    return globalSourceSource.getMetricsName();
351  }
352}