001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import org.apache.hadoop.metrics2.lib.MutableFastCounter;
021import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationGlobalSourceSourceImpl
028  implements MetricsReplicationGlobalSourceSource {
029  private static final String KEY_PREFIX = "source.";
030
031  private final MetricsReplicationSourceImpl rms;
032
033  private final MutableHistogram ageOfLastShippedOpHist;
034  private final MutableGaugeLong sizeOfLogQueueGauge;
035  private final MutableFastCounter logReadInEditsCounter;
036  private final MutableFastCounter walEditsFilteredCounter;
037  private final MutableFastCounter shippedBatchesCounter;
038  private final MutableFastCounter failedBatchesCounter;
039  private final MutableFastCounter shippedOpsCounter;
040  private final MutableFastCounter shippedBytesCounter;
041
042  /**
043   * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead.
044   */
045  @Deprecated
046  private final MutableFastCounter shippedKBsCounter;
047  private final MutableFastCounter logReadInBytesCounter;
048  private final MutableFastCounter shippedHFilesCounter;
049  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
050  private final MutableFastCounter unknownFileLengthForClosedWAL;
051  private final MutableFastCounter uncleanlyClosedWAL;
052  private final MutableFastCounter uncleanlyClosedSkippedBytes;
053  private final MutableFastCounter restartWALReading;
054  private final MutableFastCounter repeatedFileBytes;
055  private final MutableFastCounter completedWAL;
056  private final MutableFastCounter completedRecoveryQueue;
057  private final MutableFastCounter failedRecoveryQueue;
058  private final MutableGaugeLong walReaderBufferUsageBytes;
059  private final MutableGaugeInt sourceInitializing;
060
061  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
062    this.rms = rms;
063
064    ageOfLastShippedOpHist =
065      rms.getMetricsRegistry().newTimeHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
066
067    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
068
069    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
070
071    failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
072
073    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
074
075    shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
076
077    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
078
079    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
080
081    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
082
083    walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
084
085    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
086
087    sizeOfHFileRefsQueueGauge =
088      rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
089
090    unknownFileLengthForClosedWAL =
091      rms.getMetricsRegistry().getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
092    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
093    uncleanlyClosedSkippedBytes =
094      rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
095    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
096    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
097    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
098    completedRecoveryQueue =
099      rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
100    failedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
101
102    walReaderBufferUsageBytes =
103      rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
104    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
105  }
106
107  @Override
108  public void setLastShippedAge(long age) {
109    ageOfLastShippedOpHist.add(age);
110  }
111
112  @Override
113  public void incrSizeOfLogQueue(int size) {
114    sizeOfLogQueueGauge.incr(size);
115  }
116
117  @Override
118  public void decrSizeOfLogQueue(int size) {
119    sizeOfLogQueueGauge.decr(size);
120  }
121
122  @Override
123  public void incrLogReadInEdits(long size) {
124    logReadInEditsCounter.incr(size);
125  }
126
127  @Override
128  public void incrLogEditsFiltered(long size) {
129    walEditsFilteredCounter.incr(size);
130  }
131
132  @Override
133  public void incrBatchesShipped(int batches) {
134    shippedBatchesCounter.incr(batches);
135  }
136
137  @Override
138  public void incrFailedBatches() {
139    failedBatchesCounter.incr();
140  }
141
142  @Override
143  public void incrOpsShipped(long ops) {
144    shippedOpsCounter.incr(ops);
145  }
146
147  @Override
148  public void incrShippedBytes(long size) {
149    shippedBytesCounter.incr(size);
150    // obtained value maybe smaller than 1024. We should make sure that KB count
151    // eventually picks up even from multiple smaller updates.
152    incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
153  }
154
155  static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
156    // Following code should be thread-safe.
157    long delta = 0;
158    while (true) {
159      long bytes = bytesCounter.value();
160      delta = (bytes / 1024) - kbsCounter.value();
161      if (delta > 0) {
162        kbsCounter.incr(delta);
163      } else {
164        break;
165      }
166    }
167  }
168
169  @Override
170  public void incrLogReadInBytes(long size) {
171    logReadInBytesCounter.incr(size);
172  }
173
174  @Override
175  public void clear() {
176  }
177
178  @Override
179  public long getLastShippedAge() {
180    return ageOfLastShippedOpHist.getMax();
181  }
182
183  @Override
184  public void incrHFilesShipped(long hfiles) {
185    shippedHFilesCounter.incr(hfiles);
186  }
187
188  @Override
189  public void incrSizeOfHFileRefsQueue(long size) {
190    sizeOfHFileRefsQueueGauge.incr(size);
191  }
192
193  @Override
194  public void decrSizeOfHFileRefsQueue(long size) {
195    sizeOfHFileRefsQueueGauge.decr(size);
196  }
197
198  @Override
199  public int getSizeOfLogQueue() {
200    return (int) sizeOfLogQueueGauge.value();
201  }
202
203  @Override
204  public void incrUnknownFileLengthForClosedWAL() {
205    unknownFileLengthForClosedWAL.incr(1L);
206  }
207
208  @Override
209  public void incrUncleanlyClosedWALs() {
210    uncleanlyClosedWAL.incr(1L);
211  }
212
213  @Override
214  public long getUncleanlyClosedWALs() {
215    return uncleanlyClosedWAL.value();
216  }
217
218  @Override
219  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
220    uncleanlyClosedSkippedBytes.incr(bytes);
221  }
222
223  @Override
224  public void incrRestartedWALReading() {
225    restartWALReading.incr(1L);
226  }
227
228  @Override
229  public void incrRepeatedFileBytes(final long bytes) {
230    repeatedFileBytes.incr(bytes);
231  }
232
233  @Override
234  public void incrCompletedWAL() {
235    completedWAL.incr(1L);
236  }
237
238  @Override
239  public void incrCompletedRecoveryQueue() {
240    completedRecoveryQueue.incr(1L);
241  }
242
243  @Override
244  public void incrFailedRecoveryQueue() {
245    failedRecoveryQueue.incr(1L);
246  }
247
248  @Override
249  public void setOldestWalAge(long age) {
250    // Not implemented
251  }
252
253  @Override
254  public long getOldestWalAge() {
255    // Not implemented
256    return 0;
257  }
258
259  @Override
260  public void incrSourceInitializing() {
261    sourceInitializing.incr(1);
262  }
263
264  @Override
265  public void decrSourceInitializing() {
266    sourceInitializing.decr(1);
267  }
268
269  @Override
270  public int getSourceInitializing() {
271    return sourceInitializing.value();
272  }
273
274  @Override
275  public void init() {
276    rms.init();
277  }
278
279  @Override
280  public void setGauge(String gaugeName, long value) {
281    rms.setGauge(KEY_PREFIX + gaugeName, value);
282  }
283
284  @Override
285  public void incGauge(String gaugeName, long delta) {
286    rms.incGauge(KEY_PREFIX + gaugeName, delta);
287  }
288
289  @Override
290  public void decGauge(String gaugeName, long delta) {
291    rms.decGauge(KEY_PREFIX + gaugeName, delta);
292  }
293
294  @Override
295  public void removeMetric(String key) {
296    rms.removeMetric(KEY_PREFIX + key);
297  }
298
299  @Override
300  public void incCounters(String counterName, long delta) {
301    rms.incCounters(KEY_PREFIX + counterName, delta);
302  }
303
304  @Override
305  public void updateHistogram(String name, long value) {
306    rms.updateHistogram(KEY_PREFIX + name, value);
307  }
308
309  @Override
310  public String getMetricsContext() {
311    return rms.getMetricsContext();
312  }
313
314  @Override
315  public String getMetricsDescription() {
316    return rms.getMetricsDescription();
317  }
318
319  @Override
320  public String getMetricsJmxContext() {
321    return rms.getMetricsJmxContext();
322  }
323
324  @Override
325  public String getMetricsName() {
326    return rms.getMetricsName();
327  }
328
329  @Override
330  public long getWALEditsRead() {
331    return this.logReadInEditsCounter.value();
332  }
333
334  @Override
335  public long getShippedOps() {
336    return this.shippedOpsCounter.value();
337  }
338
339  @Override
340  public long getEditsFiltered() {
341    return this.walEditsFilteredCounter.value();
342  }
343
344  @Override
345  public void setWALReaderEditsBufferBytes(long usage) {
346    this.walReaderBufferUsageBytes.set(usage);
347  }
348
349  @Override
350  public long getWALReaderEditsBufferBytes() {
351    return this.walReaderBufferUsageBytes.value();
352  }
353}