001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import org.apache.hadoop.metrics2.lib.MutableFastCounter;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
028  private static final String KEY_PREFIX = "source.";
029
030  private final MetricsReplicationSourceImpl rms;
031
032  private final MutableHistogram ageOfLastShippedOpHist;
033  private final MutableGaugeLong sizeOfLogQueueGauge;
034  private final MutableFastCounter logReadInEditsCounter;
035  private final MutableFastCounter logEditsFilteredCounter;
036  private final MutableFastCounter shippedBatchesCounter;
037  private final MutableFastCounter shippedOpsCounter;
038  private final MutableFastCounter shippedBytesCounter;
039
040  /**
041   * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead.
042   */
043  @Deprecated
044  private final MutableFastCounter shippedKBsCounter;
045  private final MutableFastCounter logReadInBytesCounter;
046  private final MutableFastCounter shippedHFilesCounter;
047  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
048  private final MutableFastCounter unknownFileLengthForClosedWAL;
049  private final MutableFastCounter uncleanlyClosedWAL;
050  private final MutableFastCounter uncleanlyClosedSkippedBytes;
051  private final MutableFastCounter restartWALReading;
052  private final MutableFastCounter repeatedFileBytes;
053  private final MutableFastCounter completedWAL;
054  private final MutableFastCounter completedRecoveryQueue;
055  private final MutableFastCounter failedRecoveryQueue;
056
057  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
058    this.rms = rms;
059
060    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
061
062    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
063
064    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
065
066    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
067
068    shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
069
070    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
071
072    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
073
074    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
075
076    logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
077
078    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
079
080    sizeOfHFileRefsQueueGauge =
081        rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
082
083    unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
084            .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
085    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
086    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
087            .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
088    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
089    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
090    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
091    completedRecoveryQueue = rms.getMetricsRegistry()
092            .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
093    failedRecoveryQueue = rms.getMetricsRegistry()
094            .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
095  }
096
097  @Override public void setLastShippedAge(long age) {
098    ageOfLastShippedOpHist.add(age);
099  }
100
101  @Override public void incrSizeOfLogQueue(int size) {
102    sizeOfLogQueueGauge.incr(size);
103  }
104
105  @Override public void decrSizeOfLogQueue(int size) {
106    sizeOfLogQueueGauge.decr(size);
107  }
108
109  @Override public void incrLogReadInEdits(long size) {
110    logReadInEditsCounter.incr(size);
111  }
112
113  @Override public void incrLogEditsFiltered(long size) {
114    logEditsFilteredCounter.incr(size);
115  }
116
117  @Override public void incrBatchesShipped(int batches) {
118    shippedBatchesCounter.incr(batches);
119  }
120
121  @Override public void incrOpsShipped(long ops) {
122    shippedOpsCounter.incr(ops);
123  }
124
125  @Override public void incrShippedBytes(long size) {
126    shippedBytesCounter.incr(size);
127    // obtained value maybe smaller than 1024. We should make sure that KB count
128    // eventually picks up even from multiple smaller updates.
129    incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
130  }
131
132  static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
133    // Following code should be thread-safe.
134    long delta = 0;
135    while(true) {
136      long bytes = bytesCounter.value();
137      delta = (bytes / 1024) - kbsCounter.value();
138      if (delta > 0) {
139        kbsCounter.incr(delta);
140      } else {
141        break;
142      }
143    }
144  }
145
146  @Override public void incrLogReadInBytes(long size) {
147    logReadInBytesCounter.incr(size);
148  }
149
150  @Override public void clear() {
151  }
152
153  @Override
154  public long getLastShippedAge() {
155    return ageOfLastShippedOpHist.getMax();
156  }
157
158  @Override public void incrHFilesShipped(long hfiles) {
159    shippedHFilesCounter.incr(hfiles);
160  }
161
162  @Override
163  public void incrSizeOfHFileRefsQueue(long size) {
164    sizeOfHFileRefsQueueGauge.incr(size);
165  }
166
167  @Override
168  public void decrSizeOfHFileRefsQueue(long size) {
169    sizeOfHFileRefsQueueGauge.decr(size);
170  }
171
172  @Override
173  public int getSizeOfLogQueue() {
174    return (int)sizeOfLogQueueGauge.value();
175  }
176
177  @Override
178  public void incrUnknownFileLengthForClosedWAL() {
179    unknownFileLengthForClosedWAL.incr(1L);
180  }
181  @Override
182  public void incrUncleanlyClosedWALs() {
183    uncleanlyClosedWAL.incr(1L);
184  }
185  @Override
186  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
187    uncleanlyClosedSkippedBytes.incr(bytes);
188  }
189  @Override
190  public void incrRestartedWALReading() {
191    restartWALReading.incr(1L);
192  }
193  @Override
194  public void incrRepeatedFileBytes(final long bytes) {
195    repeatedFileBytes.incr(bytes);
196  }
197  @Override
198  public void incrCompletedWAL() {
199    completedWAL.incr(1L);
200  }
201  @Override
202  public void incrCompletedRecoveryQueue() {
203    completedRecoveryQueue.incr(1L);
204  }
205  @Override
206  public void incrFailedRecoveryQueue() {
207    failedRecoveryQueue.incr(1L);
208  }
209  @Override
210  public void init() {
211    rms.init();
212  }
213
214  @Override
215  public void setGauge(String gaugeName, long value) {
216    rms.setGauge(KEY_PREFIX + gaugeName, value);
217  }
218
219  @Override
220  public void incGauge(String gaugeName, long delta) {
221    rms.incGauge(KEY_PREFIX + gaugeName, delta);
222  }
223
224  @Override
225  public void decGauge(String gaugeName, long delta) {
226    rms.decGauge(KEY_PREFIX + gaugeName, delta);
227  }
228
229  @Override
230  public void removeMetric(String key) {
231    rms.removeMetric(KEY_PREFIX + key);
232  }
233
234  @Override
235  public void incCounters(String counterName, long delta) {
236    rms.incCounters(KEY_PREFIX + counterName, delta);
237  }
238
239  @Override
240  public void updateHistogram(String name, long value) {
241    rms.updateHistogram(KEY_PREFIX + name, value);
242  }
243
244  @Override
245  public String getMetricsContext() {
246    return rms.getMetricsContext();
247  }
248
249  @Override
250  public String getMetricsDescription() {
251    return rms.getMetricsDescription();
252  }
253
254  @Override
255  public String getMetricsJmxContext() {
256    return rms.getMetricsJmxContext();
257  }
258
259  @Override
260  public String getMetricsName() {
261    return rms.getMetricsName();
262  }
263}