001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import org.apache.hadoop.metrics2.lib.MutableFastCounter;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationGlobalSourceSourceImpl
028    implements MetricsReplicationGlobalSourceSource {
029  private static final String KEY_PREFIX = "source.";
030
031  private final MetricsReplicationSourceImpl rms;
032
033  private final MutableHistogram ageOfLastShippedOpHist;
034  private final MutableGaugeLong sizeOfLogQueueGauge;
035  private final MutableFastCounter logReadInEditsCounter;
036  private final MutableFastCounter walEditsFilteredCounter;
037  private final MutableFastCounter shippedBatchesCounter;
038  private final MutableFastCounter shippedOpsCounter;
039  private final MutableFastCounter shippedBytesCounter;
040
041  /**
042   * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead.
043   */
044  @Deprecated
045  private final MutableFastCounter shippedKBsCounter;
046  private final MutableFastCounter logReadInBytesCounter;
047  private final MutableFastCounter shippedHFilesCounter;
048  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
049  private final MutableFastCounter unknownFileLengthForClosedWAL;
050  private final MutableFastCounter uncleanlyClosedWAL;
051  private final MutableFastCounter uncleanlyClosedSkippedBytes;
052  private final MutableFastCounter restartWALReading;
053  private final MutableFastCounter repeatedFileBytes;
054  private final MutableFastCounter completedWAL;
055  private final MutableFastCounter completedRecoveryQueue;
056  private final MutableFastCounter failedRecoveryQueue;
057  private final MutableGaugeLong walReaderBufferUsageBytes;
058
059  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
060    this.rms = rms;
061
062    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
063
064    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
065
066    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
067
068    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
069
070    shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
071
072    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
073
074    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
075
076    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
077
078    walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
079
080    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
081
082    sizeOfHFileRefsQueueGauge =
083        rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
084
085    unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
086            .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
087    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
088    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
089            .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
090    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
091    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
092    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
093    completedRecoveryQueue = rms.getMetricsRegistry()
094            .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
095    failedRecoveryQueue = rms.getMetricsRegistry()
096            .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
097
098    walReaderBufferUsageBytes = rms.getMetricsRegistry()
099        .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
100  }
101
102  @Override public void setLastShippedAge(long age) {
103    ageOfLastShippedOpHist.add(age);
104  }
105
106  @Override public void incrSizeOfLogQueue(int size) {
107    sizeOfLogQueueGauge.incr(size);
108  }
109
110  @Override public void decrSizeOfLogQueue(int size) {
111    sizeOfLogQueueGauge.decr(size);
112  }
113
114  @Override public void incrLogReadInEdits(long size) {
115    logReadInEditsCounter.incr(size);
116  }
117
118  @Override public void incrLogEditsFiltered(long size) {
119    walEditsFilteredCounter.incr(size);
120  }
121
122  @Override public void incrBatchesShipped(int batches) {
123    shippedBatchesCounter.incr(batches);
124  }
125
126  @Override public void incrOpsShipped(long ops) {
127    shippedOpsCounter.incr(ops);
128  }
129
130  @Override public void incrShippedBytes(long size) {
131    shippedBytesCounter.incr(size);
132    // obtained value maybe smaller than 1024. We should make sure that KB count
133    // eventually picks up even from multiple smaller updates.
134    incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
135  }
136
137  static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
138    // Following code should be thread-safe.
139    long delta = 0;
140    while(true) {
141      long bytes = bytesCounter.value();
142      delta = (bytes / 1024) - kbsCounter.value();
143      if (delta > 0) {
144        kbsCounter.incr(delta);
145      } else {
146        break;
147      }
148    }
149  }
150  @Override public void incrLogReadInBytes(long size) {
151    logReadInBytesCounter.incr(size);
152  }
153
154  @Override public void clear() {
155  }
156
157  @Override
158  public long getLastShippedAge() {
159    return ageOfLastShippedOpHist.getMax();
160  }
161
162  @Override public void incrHFilesShipped(long hfiles) {
163    shippedHFilesCounter.incr(hfiles);
164  }
165
166  @Override
167  public void incrSizeOfHFileRefsQueue(long size) {
168    sizeOfHFileRefsQueueGauge.incr(size);
169  }
170
171  @Override
172  public void decrSizeOfHFileRefsQueue(long size) {
173    sizeOfHFileRefsQueueGauge.decr(size);
174  }
175
176  @Override
177  public int getSizeOfLogQueue() {
178    return (int)sizeOfLogQueueGauge.value();
179  }
180
181  @Override
182  public void incrUnknownFileLengthForClosedWAL() {
183    unknownFileLengthForClosedWAL.incr(1L);
184  }
185  @Override
186  public void incrUncleanlyClosedWALs() {
187    uncleanlyClosedWAL.incr(1L);
188  }
189  @Override
190  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
191    uncleanlyClosedSkippedBytes.incr(bytes);
192  }
193  @Override
194  public void incrRestartedWALReading() {
195    restartWALReading.incr(1L);
196  }
197  @Override
198  public void incrRepeatedFileBytes(final long bytes) {
199    repeatedFileBytes.incr(bytes);
200  }
201  @Override
202  public void incrCompletedWAL() {
203    completedWAL.incr(1L);
204  }
205  @Override
206  public void incrCompletedRecoveryQueue() {
207    completedRecoveryQueue.incr(1L);
208  }
209  @Override
210  public void incrFailedRecoveryQueue() {
211    failedRecoveryQueue.incr(1L);
212  }
213  @Override
214  public void init() {
215    rms.init();
216  }
217
218  @Override
219  public void setGauge(String gaugeName, long value) {
220    rms.setGauge(KEY_PREFIX + gaugeName, value);
221  }
222
223  @Override
224  public void incGauge(String gaugeName, long delta) {
225    rms.incGauge(KEY_PREFIX + gaugeName, delta);
226  }
227
228  @Override
229  public void decGauge(String gaugeName, long delta) {
230    rms.decGauge(KEY_PREFIX + gaugeName, delta);
231  }
232
233  @Override
234  public void removeMetric(String key) {
235    rms.removeMetric(KEY_PREFIX + key);
236  }
237
238  @Override
239  public void incCounters(String counterName, long delta) {
240    rms.incCounters(KEY_PREFIX + counterName, delta);
241  }
242
243  @Override
244  public void updateHistogram(String name, long value) {
245    rms.updateHistogram(KEY_PREFIX + name, value);
246  }
247
248  @Override
249  public String getMetricsContext() {
250    return rms.getMetricsContext();
251  }
252
253  @Override
254  public String getMetricsDescription() {
255    return rms.getMetricsDescription();
256  }
257
258  @Override
259  public String getMetricsJmxContext() {
260    return rms.getMetricsJmxContext();
261  }
262
263  @Override
264  public String getMetricsName() {
265    return rms.getMetricsName();
266  }
267
268  @Override
269  public long getWALEditsRead() {
270    return this.logReadInEditsCounter.value();
271  }
272
273  @Override
274  public long getShippedOps() {
275    return this.shippedOpsCounter.value();
276  }
277
278  @Override
279  public long getEditsFiltered() {
280    return this.walEditsFilteredCounter.value();
281  }
282
283  @Override
284  public void setWALReaderEditsBufferBytes(long usage) {
285    this.walReaderBufferUsageBytes.set(usage);
286  }
287
288  @Override
289  public long getWALReaderEditsBufferBytes() {
290    return this.walReaderBufferUsageBytes.value();
291  }
292}