001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import org.apache.hadoop.metrics2.lib.MutableFastCounter;
021import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSourceSource {
028
029  private final MetricsReplicationSourceImpl rms;
030  private final String id;
031  private final String sizeOfLogQueueKey;
032  private final String ageOfLastShippedOpKey;
033  private final String logReadInEditsKey;
034  private final String logEditsFilteredKey;
035  private final String shippedBatchesKey;
036  private final String shippedOpsKey;
037  private final String failedBatchesKey;
038  private String keyPrefix;
039
040  /**
041   * @deprecated since 1.3.0. Use {@link #shippedBytesKey} instead.
042   */
043  @Deprecated
044  private final String shippedKBsKey;
045  private final String shippedBytesKey;
046  private final String logReadInBytesKey;
047  private final String shippedHFilesKey;
048  private final String sizeOfHFileRefsQueueKey;
049  private final String oldestWalAgeKey;
050  private final String sourceInitializingKey;
051
052  private final MutableHistogram ageOfLastShippedOpHist;
053  private final MutableGaugeLong sizeOfLogQueueGauge;
054  private final MutableFastCounter logReadInEditsCounter;
055  private final MutableFastCounter walEditsFilteredCounter;
056  private final MutableFastCounter shippedBatchesCounter;
057  private final MutableFastCounter failedBatchesCounter;
058  private final MutableFastCounter shippedOpsCounter;
059  private final MutableFastCounter shippedKBsCounter;
060  private final MutableFastCounter shippedBytesCounter;
061  private final MutableFastCounter logReadInBytesCounter;
062  private final MutableFastCounter shippedHFilesCounter;
063  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
064
065  private final String unknownFileLengthKey;
066  private final String uncleanlyClosedKey;
067  private final String uncleanlySkippedBytesKey;
068  private final String restartedKey;
069  private final String repeatedBytesKey;
070  private final String completedLogsKey;
071  private final String completedRecoveryKey;
072  private final MutableFastCounter unknownFileLengthForClosedWAL;
073  private final MutableFastCounter uncleanlyClosedWAL;
074  private final MutableFastCounter uncleanlyClosedSkippedBytes;
075  private final MutableFastCounter restartWALReading;
076  private final MutableFastCounter repeatedFileBytes;
077  private final MutableFastCounter completedWAL;
078  private final MutableFastCounter completedRecoveryQueue;
079  private final MutableGaugeLong oldestWalAge;
080  private final MutableGaugeInt sourceInitializing;
081
082  public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
083    this.rms = rms;
084    this.id = id;
085    this.keyPrefix = "source." + this.id + ".";
086
087    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
088    ageOfLastShippedOpHist = rms.getMetricsRegistry().newTimeHistogram(ageOfLastShippedOpKey);
089
090    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
091    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
092
093    shippedBatchesKey = this.keyPrefix + "shippedBatches";
094    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
095
096    failedBatchesKey = this.keyPrefix + "failedBatches";
097    failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
098
099    shippedOpsKey = this.keyPrefix + "shippedOps";
100    shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
101
102    shippedKBsKey = this.keyPrefix + "shippedKBs";
103    shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
104
105    shippedBytesKey = this.keyPrefix + "shippedBytes";
106    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
107
108    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
109    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
110
111    logReadInEditsKey = this.keyPrefix + "logEditsRead";
112    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
113
114    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
115    walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
116
117    shippedHFilesKey = this.keyPrefix + "shippedHFiles";
118    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
119
120    sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
121    sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
122
123    unknownFileLengthKey = this.keyPrefix + "closedLogsWithUnknownFileLength";
124    unknownFileLengthForClosedWAL = rms.getMetricsRegistry().getCounter(unknownFileLengthKey, 0L);
125
126    uncleanlyClosedKey = this.keyPrefix + "uncleanlyClosedLogs";
127    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(uncleanlyClosedKey, 0L);
128
129    uncleanlySkippedBytesKey = this.keyPrefix + "ignoredUncleanlyClosedLogContentsInBytes";
130    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry().getCounter(uncleanlySkippedBytesKey, 0L);
131
132    restartedKey = this.keyPrefix + "restartedLogReading";
133    restartWALReading = rms.getMetricsRegistry().getCounter(restartedKey, 0L);
134
135    repeatedBytesKey = this.keyPrefix + "repeatedLogFileBytes";
136    repeatedFileBytes = rms.getMetricsRegistry().getCounter(repeatedBytesKey, 0L);
137
138    completedLogsKey = this.keyPrefix + "completedLogs";
139    completedWAL = rms.getMetricsRegistry().getCounter(completedLogsKey, 0L);
140
141    completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
142    completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
143
144    oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
145    oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
146
147    sourceInitializingKey = this.keyPrefix + "isInitializing";
148    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
149  }
150
151  @Override
152  public void setLastShippedAge(long age) {
153    ageOfLastShippedOpHist.add(age);
154  }
155
156  @Override
157  public void incrSizeOfLogQueue(int size) {
158    sizeOfLogQueueGauge.incr(size);
159  }
160
161  @Override
162  public void decrSizeOfLogQueue(int size) {
163    sizeOfLogQueueGauge.decr(size);
164  }
165
166  @Override
167  public void incrLogReadInEdits(long size) {
168    logReadInEditsCounter.incr(size);
169  }
170
171  @Override
172  public void incrLogEditsFiltered(long size) {
173    walEditsFilteredCounter.incr(size);
174  }
175
176  @Override
177  public void incrBatchesShipped(int batches) {
178    shippedBatchesCounter.incr(batches);
179  }
180
181  @Override
182  public void incrFailedBatches() {
183    failedBatchesCounter.incr();
184  }
185
186  @Override
187  public void incrOpsShipped(long ops) {
188    shippedOpsCounter.incr(ops);
189  }
190
191  @Override
192  public void incrShippedBytes(long size) {
193    shippedBytesCounter.incr(size);
194    MetricsReplicationGlobalSourceSourceImpl.incrementKBsCounter(shippedBytesCounter,
195      shippedKBsCounter);
196  }
197
198  @Override
199  public void incrLogReadInBytes(long size) {
200    logReadInBytesCounter.incr(size);
201  }
202
203  @Override
204  public void clear() {
205    rms.removeMetric(ageOfLastShippedOpKey);
206
207    rms.removeMetric(sizeOfLogQueueKey);
208
209    rms.removeMetric(shippedBatchesKey);
210    rms.removeMetric(failedBatchesKey);
211    rms.removeMetric(shippedOpsKey);
212    rms.removeMetric(shippedKBsKey);
213    rms.removeMetric(shippedBytesKey);
214
215    rms.removeMetric(logReadInBytesKey);
216    rms.removeMetric(logReadInEditsKey);
217
218    rms.removeMetric(logEditsFilteredKey);
219
220    rms.removeMetric(shippedHFilesKey);
221    rms.removeMetric(sizeOfHFileRefsQueueKey);
222
223    rms.removeMetric(unknownFileLengthKey);
224    rms.removeMetric(uncleanlyClosedKey);
225    rms.removeMetric(uncleanlySkippedBytesKey);
226    rms.removeMetric(restartedKey);
227    rms.removeMetric(repeatedBytesKey);
228    rms.removeMetric(completedLogsKey);
229    rms.removeMetric(completedRecoveryKey);
230    rms.removeMetric(oldestWalAgeKey);
231    rms.removeMetric(sourceInitializingKey);
232  }
233
234  @Override
235  public long getLastShippedAge() {
236    return ageOfLastShippedOpHist.getMax();
237  }
238
239  @Override
240  public void incrHFilesShipped(long hfiles) {
241    shippedHFilesCounter.incr(hfiles);
242  }
243
244  @Override
245  public void incrSizeOfHFileRefsQueue(long size) {
246    sizeOfHFileRefsQueueGauge.incr(size);
247  }
248
249  @Override
250  public void decrSizeOfHFileRefsQueue(long size) {
251    sizeOfHFileRefsQueueGauge.decr(size);
252  }
253
254  @Override
255  public int getSizeOfLogQueue() {
256    return (int) sizeOfLogQueueGauge.value();
257  }
258
259  @Override
260  public void incrUnknownFileLengthForClosedWAL() {
261    unknownFileLengthForClosedWAL.incr(1L);
262  }
263
264  @Override
265  public void incrUncleanlyClosedWALs() {
266    uncleanlyClosedWAL.incr(1L);
267  }
268
269  @Override
270  public long getUncleanlyClosedWALs() {
271    return uncleanlyClosedWAL.value();
272  }
273
274  @Override
275  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
276    uncleanlyClosedSkippedBytes.incr(bytes);
277  }
278
279  @Override
280  public void incrRestartedWALReading() {
281    restartWALReading.incr(1L);
282  }
283
284  @Override
285  public void incrRepeatedFileBytes(final long bytes) {
286    repeatedFileBytes.incr(bytes);
287  }
288
289  @Override
290  public void incrCompletedWAL() {
291    completedWAL.incr(1L);
292  }
293
294  @Override
295  public void incrCompletedRecoveryQueue() {
296    completedRecoveryQueue.incr(1L);
297  }
298
299  @Override
300  public void incrFailedRecoveryQueue() {
301    /* no op */}
302
303  @Override
304  public void setOldestWalAge(long age) {
305    oldestWalAge.set(age);
306  }
307
308  @Override
309  public long getOldestWalAge() {
310    return oldestWalAge.value();
311  }
312
313  @Override
314  public void incrSourceInitializing() {
315    sourceInitializing.incr(1);
316  }
317
318  @Override
319  public int getSourceInitializing() {
320    return sourceInitializing.value();
321  }
322
323  @Override
324  public void decrSourceInitializing() {
325    sourceInitializing.decr(1);
326  }
327
328  @Override
329  public void init() {
330    rms.init();
331  }
332
333  @Override
334  public void setGauge(String gaugeName, long value) {
335    rms.setGauge(this.keyPrefix + gaugeName, value);
336  }
337
338  @Override
339  public void incGauge(String gaugeName, long delta) {
340    rms.incGauge(this.keyPrefix + gaugeName, delta);
341  }
342
343  @Override
344  public void decGauge(String gaugeName, long delta) {
345    rms.decGauge(this.keyPrefix + gaugeName, delta);
346  }
347
348  @Override
349  public void removeMetric(String key) {
350    rms.removeMetric(this.keyPrefix + key);
351  }
352
353  @Override
354  public void incCounters(String counterName, long delta) {
355    rms.incCounters(this.keyPrefix + counterName, delta);
356  }
357
358  @Override
359  public void updateHistogram(String name, long value) {
360    rms.updateHistogram(this.keyPrefix + name, value);
361  }
362
363  @Override
364  public String getMetricsContext() {
365    return rms.getMetricsContext();
366  }
367
368  @Override
369  public String getMetricsDescription() {
370    return rms.getMetricsDescription();
371  }
372
373  @Override
374  public String getMetricsJmxContext() {
375    return rms.getMetricsJmxContext();
376  }
377
378  @Override
379  public String getMetricsName() {
380    return rms.getMetricsName();
381  }
382
383  @Override
384  public long getWALEditsRead() {
385    return this.logReadInEditsCounter.value();
386  }
387
388  @Override
389  public long getShippedOps() {
390    return this.shippedOpsCounter.value();
391  }
392
393  @Override
394  public long getEditsFiltered() {
395    return this.walEditsFilteredCounter.value();
396  }
397}