001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import org.apache.hadoop.metrics2.lib.MutableFastCounter;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
028  private static final String KEY_PREFIX = "source.";
029
030  private final MetricsReplicationSourceImpl rms;
031
032  private final MutableHistogram ageOfLastShippedOpHist;
033  private final MutableGaugeLong sizeOfLogQueueGauge;
034  private final MutableFastCounter logReadInEditsCounter;
035  private final MutableFastCounter logEditsFilteredCounter;
036  private final MutableFastCounter shippedBatchesCounter;
037  private final MutableFastCounter shippedOpsCounter;
038  private final MutableFastCounter shippedBytesCounter;
039
040  /**
041   * @deprecated since 1.3.0. Use {@link #shippedBytesCounter} instead.
042   */
043  @Deprecated
044  private final MutableFastCounter shippedKBsCounter;
045  private final MutableFastCounter logReadInBytesCounter;
046  private final MutableFastCounter shippedHFilesCounter;
047  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
048  private final MutableFastCounter unknownFileLengthForClosedWAL;
049  private final MutableFastCounter uncleanlyClosedWAL;
050  private final MutableFastCounter uncleanlyClosedSkippedBytes;
051  private final MutableFastCounter restartWALReading;
052  private final MutableFastCounter repeatedFileBytes;
053  private final MutableFastCounter completedWAL;
054  private final MutableFastCounter completedRecoveryQueue;
055
056  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
057    this.rms = rms;
058
059    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
060
061    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
062
063    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
064
065    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
066
067    shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
068
069    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
070
071    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
072
073    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
074
075    logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
076
077    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
078
079    sizeOfHFileRefsQueueGauge =
080        rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
081
082    unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
083            .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
084    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
085    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
086            .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
087    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
088    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
089    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
090    completedRecoveryQueue = rms.getMetricsRegistry()
091            .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
092  }
093
094  @Override public void setLastShippedAge(long age) {
095    ageOfLastShippedOpHist.add(age);
096  }
097
098  @Override public void incrSizeOfLogQueue(int size) {
099    sizeOfLogQueueGauge.incr(size);
100  }
101
102  @Override public void decrSizeOfLogQueue(int size) {
103    sizeOfLogQueueGauge.decr(size);
104  }
105
106  @Override public void incrLogReadInEdits(long size) {
107    logReadInEditsCounter.incr(size);
108  }
109
110  @Override public void incrLogEditsFiltered(long size) {
111    logEditsFilteredCounter.incr(size);
112  }
113
114  @Override public void incrBatchesShipped(int batches) {
115    shippedBatchesCounter.incr(batches);
116  }
117
118  @Override public void incrOpsShipped(long ops) {
119    shippedOpsCounter.incr(ops);
120  }
121
122  @Override public void incrShippedBytes(long size) {
123    shippedBytesCounter.incr(size);
124    // obtained value maybe smaller than 1024. We should make sure that KB count
125    // eventually picks up even from multiple smaller updates.
126    incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
127  }
128
129  static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
130    // Following code should be thread-safe.
131    long delta = 0;
132    while(true) {
133      long bytes = bytesCounter.value();
134      delta = (bytes / 1024) - kbsCounter.value();
135      if (delta > 0) {
136        kbsCounter.incr(delta);
137      } else {
138        break;
139      }
140    }
141  }
142
143  @Override public void incrLogReadInBytes(long size) {
144    logReadInBytesCounter.incr(size);
145  }
146
147  @Override public void clear() {
148  }
149
150  @Override
151  public long getLastShippedAge() {
152    return ageOfLastShippedOpHist.getMax();
153  }
154
155  @Override public void incrHFilesShipped(long hfiles) {
156    shippedHFilesCounter.incr(hfiles);
157  }
158
159  @Override
160  public void incrSizeOfHFileRefsQueue(long size) {
161    sizeOfHFileRefsQueueGauge.incr(size);
162  }
163
164  @Override
165  public void decrSizeOfHFileRefsQueue(long size) {
166    sizeOfHFileRefsQueueGauge.decr(size);
167  }
168
169  @Override
170  public int getSizeOfLogQueue() {
171    return (int)sizeOfLogQueueGauge.value();
172  }
173
174  @Override
175  public void incrUnknownFileLengthForClosedWAL() {
176    unknownFileLengthForClosedWAL.incr(1L);
177  }
178  @Override
179  public void incrUncleanlyClosedWALs() {
180    uncleanlyClosedWAL.incr(1L);
181  }
182  @Override
183  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
184    uncleanlyClosedSkippedBytes.incr(bytes);
185  }
186  @Override
187  public void incrRestartedWALReading() {
188    restartWALReading.incr(1L);
189  }
190  @Override
191  public void incrRepeatedFileBytes(final long bytes) {
192    repeatedFileBytes.incr(bytes);
193  }
194  @Override
195  public void incrCompletedWAL() {
196    completedWAL.incr(1L);
197  }
198  @Override
199  public void incrCompletedRecoveryQueue() {
200    completedRecoveryQueue.incr(1L);
201  }
202
203  @Override
204  public void init() {
205    rms.init();
206  }
207
208  @Override
209  public void setGauge(String gaugeName, long value) {
210    rms.setGauge(KEY_PREFIX + gaugeName, value);
211  }
212
213  @Override
214  public void incGauge(String gaugeName, long delta) {
215    rms.incGauge(KEY_PREFIX + gaugeName, delta);
216  }
217
218  @Override
219  public void decGauge(String gaugeName, long delta) {
220    rms.decGauge(KEY_PREFIX + gaugeName, delta);
221  }
222
223  @Override
224  public void removeMetric(String key) {
225    rms.removeMetric(KEY_PREFIX + key);
226  }
227
228  @Override
229  public void incCounters(String counterName, long delta) {
230    rms.incCounters(KEY_PREFIX + counterName, delta);
231  }
232
233  @Override
234  public void updateHistogram(String name, long value) {
235    rms.updateHistogram(KEY_PREFIX + name, value);
236  }
237
238  @Override
239  public String getMetricsContext() {
240    return rms.getMetricsContext();
241  }
242
243  @Override
244  public String getMetricsDescription() {
245    return rms.getMetricsDescription();
246  }
247
248  @Override
249  public String getMetricsJmxContext() {
250    return rms.getMetricsJmxContext();
251  }
252
253  @Override
254  public String getMetricsName() {
255    return rms.getMetricsName();
256  }
257}