001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.metrics.BaseSource;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034/**
035 * This class is for maintaining the various replication statistics for a source and publishing them
036 * through the metrics interfaces.
037 */
038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
039public class MetricsSource implements BaseSource {
040
041  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
042
043  // tracks last shipped timestamp for each wal group
044  private Map<String, Long> lastTimestamps = new HashMap<>();
045  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
046  private long lastHFileRefsQueueSize = 0;
047  private String id;
048
049  private final MetricsReplicationSourceSource singleSourceSource;
050  private final MetricsReplicationSourceSource globalSourceSource;
051  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
052
053  /**
054   * Constructor used to register the metrics
055   *
056   * @param id Name of the source this class is monitoring
057   */
058  public MetricsSource(String id) {
059    this.id = id;
060    singleSourceSource =
061        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
062            .getSource(id);
063    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
064    singleSourceSourceByTable = new HashMap<>();
065  }
066
067  /**
068   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
069   * @param id Name of the source this class is monitoring
070   * @param singleSourceSource Class to monitor id-scoped metrics
071   * @param globalSourceSource Class to monitor global-scoped metrics
072   */
073  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
074                       MetricsReplicationSourceSource globalSourceSource,
075                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
076    this.id = id;
077    this.singleSourceSource = singleSourceSource;
078    this.globalSourceSource = globalSourceSource;
079    this.singleSourceSourceByTable = singleSourceSourceByTable;
080  }
081
082  /**
083   * Set the age of the last edit that was shipped
084   * @param timestamp write time of the edit
085   * @param walGroup which group we are setting
086   */
087  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
088    long age = EnvironmentEdgeManager.currentTime() - timestamp;
089    singleSourceSource.setLastShippedAge(age);
090    globalSourceSource.setLastShippedAge(age);
091    this.ageOfLastShippedOp.put(walGroup, age);
092    this.lastTimestamps.put(walGroup, timestamp);
093  }
094
095  /**
096   * Set the age of the last edit that was shipped group by table
097   * @param timestamp write time of the edit
098   * @param tableName String as group and tableName
099   */
100  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
101    long age = EnvironmentEdgeManager.currentTime() - timestamp;
102    this.getSingleSourceSourceByTable().computeIfAbsent(
103        tableName, t -> CompatibilitySingletonFactory
104            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
105            .setLastShippedAge(age);
106  }
107
108  /**
109   * get the last timestamp of given wal group. If the walGroup is null, return 0.
110   * @param walGroup which group we are getting
111   * @return timeStamp
112   */
113  public long getLastTimeStampOfWalGroup(String walGroup) {
114    return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
115  }
116
117  /**
118   * get age of last shipped op of given wal group. If the walGroup is null, return 0
119   * @param walGroup which group we are getting
120   * @return age
121   */
122  public long getAgeofLastShippedOp(String walGroup) {
123    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
124  }
125
126  /**
127   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
128   * when replication fails and need to keep that metric accurate.
129   * @param walGroupId id of the group to update
130   */
131  public void refreshAgeOfLastShippedOp(String walGroupId) {
132    Long lastTimestamp = this.lastTimestamps.get(walGroupId);
133    if (lastTimestamp == null) {
134      this.lastTimestamps.put(walGroupId, 0L);
135      lastTimestamp = 0L;
136    }
137    if (lastTimestamp > 0) {
138      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
139    }
140  }
141
142  /**
143   * Increment size of the log queue.
144   */
145  public void incrSizeOfLogQueue() {
146    singleSourceSource.incrSizeOfLogQueue(1);
147    globalSourceSource.incrSizeOfLogQueue(1);
148  }
149
150  public void decrSizeOfLogQueue() {
151    singleSourceSource.decrSizeOfLogQueue(1);
152    globalSourceSource.decrSizeOfLogQueue(1);
153  }
154
155  /**
156   * Add on the the number of log edits read
157   *
158   * @param delta the number of log edits read.
159   */
160  private void incrLogEditsRead(long delta) {
161    singleSourceSource.incrLogReadInEdits(delta);
162    globalSourceSource.incrLogReadInEdits(delta);
163  }
164
165  /** Increment the number of log edits read by one. */
166  public void incrLogEditsRead() {
167    incrLogEditsRead(1);
168  }
169
170  /**
171   * Add on the number of log edits filtered
172   *
173   * @param delta the number filtered.
174   */
175  public void incrLogEditsFiltered(long delta) {
176    singleSourceSource.incrLogEditsFiltered(delta);
177    globalSourceSource.incrLogEditsFiltered(delta);
178  }
179
180  /** The number of log edits filtered out. */
181  public void incrLogEditsFiltered() {
182    incrLogEditsFiltered(1);
183  }
184
185  /**
186   * Convience method to apply changes to metrics do to shipping a batch of logs.
187   *
188   * @param batchSize the size of the batch that was shipped to sinks.
189   */
190  public void shipBatch(long batchSize, int sizeInBytes) {
191    singleSourceSource.incrBatchesShipped(1);
192    globalSourceSource.incrBatchesShipped(1);
193
194    singleSourceSource.incrOpsShipped(batchSize);
195    globalSourceSource.incrOpsShipped(batchSize);
196
197    singleSourceSource.incrShippedBytes(sizeInBytes);
198    globalSourceSource.incrShippedBytes(sizeInBytes);
199  }
200
201  /**
202   * Convience method to apply changes to metrics do to shipping a batch of logs.
203   *
204   * @param batchSize the size of the batch that was shipped to sinks.
205   * @param hfiles total number of hfiles shipped to sinks.
206   */
207  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
208    shipBatch(batchSize, sizeInBytes);
209    singleSourceSource.incrHFilesShipped(hfiles);
210    globalSourceSource.incrHFilesShipped(hfiles);
211  }
212
213  /** increase the byte number read by source from log file */
214  public void incrLogReadInBytes(long readInBytes) {
215    singleSourceSource.incrLogReadInBytes(readInBytes);
216    globalSourceSource.incrLogReadInBytes(readInBytes);
217  }
218
219  /** Removes all metrics about this Source. */
220  public void clear() {
221    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
222    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
223    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
224    singleSourceSource.clear();
225    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
226    lastTimestamps.clear();
227    lastHFileRefsQueueSize = 0;
228  }
229
230  /**
231   * Get AgeOfLastShippedOp
232   * @return AgeOfLastShippedOp
233   */
234  public Long getAgeOfLastShippedOp() {
235    return singleSourceSource.getLastShippedAge();
236  }
237
238  /**
239   * Get the sizeOfLogQueue
240   * @return sizeOfLogQueue
241   */
242  public int getSizeOfLogQueue() {
243    return singleSourceSource.getSizeOfLogQueue();
244  }
245
246  /**
247   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
248   * @return lastTimestampForAge
249   * @deprecated Since 2.0.0. Removed in 3.0.0.
250   * @see #getTimestampOfLastShippedOp()
251   */
252  @Deprecated
253  public long getTimeStampOfLastShippedOp() {
254    return getTimestampOfLastShippedOp();
255  }
256
257  /**
258   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
259   * @return lastTimestampForAge
260   */
261  public long getTimestampOfLastShippedOp() {
262    long lastTimestamp = 0L;
263    for (long ts : lastTimestamps.values()) {
264      if (ts > lastTimestamp) {
265        lastTimestamp = ts;
266      }
267    }
268    return lastTimestamp;
269  }
270
271  /**
272   * Get the slave peer ID
273   * @return peerID
274   */
275  public String getPeerID() {
276    return id;
277  }
278
279  public void incrSizeOfHFileRefsQueue(long size) {
280    singleSourceSource.incrSizeOfHFileRefsQueue(size);
281    globalSourceSource.incrSizeOfHFileRefsQueue(size);
282    lastHFileRefsQueueSize = size;
283  }
284
285  public void decrSizeOfHFileRefsQueue(int size) {
286    singleSourceSource.decrSizeOfHFileRefsQueue(size);
287    globalSourceSource.decrSizeOfHFileRefsQueue(size);
288    lastHFileRefsQueueSize -= size;
289    if (lastHFileRefsQueueSize < 0) {
290      lastHFileRefsQueueSize = 0;
291    }
292  }
293
294  public void incrUnknownFileLengthForClosedWAL() {
295    singleSourceSource.incrUnknownFileLengthForClosedWAL();
296    globalSourceSource.incrUnknownFileLengthForClosedWAL();
297  }
298
299  public void incrUncleanlyClosedWALs() {
300    singleSourceSource.incrUncleanlyClosedWALs();
301    globalSourceSource.incrUncleanlyClosedWALs();
302  }
303
304  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
305    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
306    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
307  }
308
309  public void incrRestartedWALReading() {
310    singleSourceSource.incrRestartedWALReading();
311    globalSourceSource.incrRestartedWALReading();
312  }
313
314  public void incrRepeatedFileBytes(final long bytes) {
315    singleSourceSource.incrRepeatedFileBytes(bytes);
316    globalSourceSource.incrRepeatedFileBytes(bytes);
317  }
318
319  public void incrCompletedWAL() {
320    singleSourceSource.incrCompletedWAL();
321    globalSourceSource.incrCompletedWAL();
322  }
323
324  public void incrCompletedRecoveryQueue() {
325    singleSourceSource.incrCompletedRecoveryQueue();
326    globalSourceSource.incrCompletedRecoveryQueue();
327  }
328
329  public void incrFailedRecoveryQueue() {
330    globalSourceSource.incrFailedRecoveryQueue();
331  }
332
333  @Override
334  public void init() {
335    singleSourceSource.init();
336    globalSourceSource.init();
337  }
338
339  @Override
340  public void setGauge(String gaugeName, long value) {
341    singleSourceSource.setGauge(gaugeName, value);
342    globalSourceSource.setGauge(gaugeName, value);
343  }
344
345  @Override
346  public void incGauge(String gaugeName, long delta) {
347    singleSourceSource.incGauge(gaugeName, delta);
348    globalSourceSource.incGauge(gaugeName, delta);
349  }
350
351  @Override
352  public void decGauge(String gaugeName, long delta) {
353    singleSourceSource.decGauge(gaugeName, delta);
354    globalSourceSource.decGauge(gaugeName, delta);
355  }
356
357  @Override
358  public void removeMetric(String key) {
359    singleSourceSource.removeMetric(key);
360    globalSourceSource.removeMetric(key);
361  }
362
363  @Override
364  public void incCounters(String counterName, long delta) {
365    singleSourceSource.incCounters(counterName, delta);
366    globalSourceSource.incCounters(counterName, delta);
367  }
368
369  @Override
370  public void updateHistogram(String name, long value) {
371    singleSourceSource.updateHistogram(name, value);
372    globalSourceSource.updateHistogram(name, value);
373  }
374
375  @Override
376  public String getMetricsContext() {
377    return globalSourceSource.getMetricsContext();
378  }
379
380  @Override
381  public String getMetricsDescription() {
382    return globalSourceSource.getMetricsDescription();
383  }
384
385  @Override
386  public String getMetricsJmxContext() {
387    return globalSourceSource.getMetricsJmxContext();
388  }
389
390  @Override
391  public String getMetricsName() {
392    return globalSourceSource.getMetricsName();
393  }
394
395  @VisibleForTesting
396  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
397    return singleSourceSourceByTable;
398  }
399}