001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import org.apache.hadoop.metrics2.lib.MutableFastCounter;
021import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
022import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
023import org.apache.hadoop.metrics2.lib.MutableHistogram;
024import org.apache.yetus.audience.InterfaceAudience;
025
026@InterfaceAudience.Private
027public class MetricsReplicationGlobalSourceSourceImpl
028  implements MetricsReplicationGlobalSourceSource {
029  private static final String KEY_PREFIX = "source.";
030
031  private final MetricsReplicationSourceImpl rms;
032
033  private final MutableHistogram ageOfLastShippedOpHist;
034  private final MutableGaugeLong sizeOfLogQueueGauge;
035  private final MutableFastCounter logReadInEditsCounter;
036  private final MutableFastCounter walEditsFilteredCounter;
037  private final MutableFastCounter shippedBatchesCounter;
038  private final MutableFastCounter failedBatchesCounter;
039  private final MutableFastCounter shippedOpsCounter;
040  private final MutableFastCounter shippedBytesCounter;
041  private final MutableFastCounter logReadInBytesCounter;
042  private final MutableFastCounter shippedHFilesCounter;
043  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
044  private final MutableFastCounter unknownFileLengthForClosedWAL;
045  private final MutableFastCounter uncleanlyClosedWAL;
046  private final MutableFastCounter uncleanlyClosedSkippedBytes;
047  private final MutableFastCounter restartWALReading;
048  private final MutableFastCounter repeatedFileBytes;
049  private final MutableFastCounter completedWAL;
050  private final MutableFastCounter completedRecoveryQueue;
051  private final MutableFastCounter failedRecoveryQueue;
052  private final MutableGaugeLong walReaderBufferUsageBytes;
053  private final MutableGaugeInt sourceInitializing;
054
055  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
056    this.rms = rms;
057
058    ageOfLastShippedOpHist =
059      rms.getMetricsRegistry().newTimeHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
060
061    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
062
063    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
064
065    failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
066
067    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
068
069    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
070
071    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
072
073    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
074
075    walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
076
077    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
078
079    sizeOfHFileRefsQueueGauge =
080      rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
081
082    unknownFileLengthForClosedWAL =
083      rms.getMetricsRegistry().getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
084    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
085    uncleanlyClosedSkippedBytes =
086      rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
087    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
088    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
089    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
090    completedRecoveryQueue =
091      rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
092    failedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
093
094    walReaderBufferUsageBytes =
095      rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
096    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
097  }
098
099  @Override
100  public void setLastShippedAge(long age) {
101    ageOfLastShippedOpHist.add(age);
102  }
103
104  @Override
105  public void incrSizeOfLogQueue(int size) {
106    sizeOfLogQueueGauge.incr(size);
107  }
108
109  @Override
110  public void decrSizeOfLogQueue(int size) {
111    sizeOfLogQueueGauge.decr(size);
112  }
113
114  @Override
115  public void incrLogReadInEdits(long size) {
116    logReadInEditsCounter.incr(size);
117  }
118
119  @Override
120  public void incrLogEditsFiltered(long size) {
121    walEditsFilteredCounter.incr(size);
122  }
123
124  @Override
125  public void incrBatchesShipped(int batches) {
126    shippedBatchesCounter.incr(batches);
127  }
128
129  @Override
130  public void incrFailedBatches() {
131    failedBatchesCounter.incr();
132  }
133
134  @Override
135  public void incrOpsShipped(long ops) {
136    shippedOpsCounter.incr(ops);
137  }
138
139  @Override
140  public void incrShippedBytes(long size) {
141    shippedBytesCounter.incr(size);
142  }
143
144  @Override
145  public void incrLogReadInBytes(long size) {
146    logReadInBytesCounter.incr(size);
147  }
148
149  @Override
150  public void clear() {
151  }
152
153  @Override
154  public long getLastShippedAge() {
155    return ageOfLastShippedOpHist.getMax();
156  }
157
158  @Override
159  public void incrHFilesShipped(long hfiles) {
160    shippedHFilesCounter.incr(hfiles);
161  }
162
163  @Override
164  public void incrSizeOfHFileRefsQueue(long size) {
165    sizeOfHFileRefsQueueGauge.incr(size);
166  }
167
168  @Override
169  public void decrSizeOfHFileRefsQueue(long size) {
170    sizeOfHFileRefsQueueGauge.decr(size);
171  }
172
173  @Override
174  public int getSizeOfLogQueue() {
175    return (int) sizeOfLogQueueGauge.value();
176  }
177
178  @Override
179  public void incrUnknownFileLengthForClosedWAL() {
180    unknownFileLengthForClosedWAL.incr(1L);
181  }
182
183  @Override
184  public void incrUncleanlyClosedWALs() {
185    uncleanlyClosedWAL.incr(1L);
186  }
187
188  @Override
189  public long getUncleanlyClosedWALs() {
190    return uncleanlyClosedWAL.value();
191  }
192
193  @Override
194  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
195    uncleanlyClosedSkippedBytes.incr(bytes);
196  }
197
198  @Override
199  public void incrRestartedWALReading() {
200    restartWALReading.incr(1L);
201  }
202
203  @Override
204  public void incrRepeatedFileBytes(final long bytes) {
205    repeatedFileBytes.incr(bytes);
206  }
207
208  @Override
209  public void incrCompletedWAL() {
210    completedWAL.incr(1L);
211  }
212
213  @Override
214  public void incrCompletedRecoveryQueue() {
215    completedRecoveryQueue.incr(1L);
216  }
217
218  @Override
219  public void incrFailedRecoveryQueue() {
220    failedRecoveryQueue.incr(1L);
221  }
222
223  @Override
224  public void setOldestWalAge(long age) {
225    // Not implemented
226  }
227
228  @Override
229  public long getOldestWalAge() {
230    // Not implemented
231    return 0;
232  }
233
234  @Override
235  public void incrSourceInitializing() {
236    sourceInitializing.incr(1);
237  }
238
239  @Override
240  public void decrSourceInitializing() {
241    sourceInitializing.decr(1);
242  }
243
244  @Override
245  public int getSourceInitializing() {
246    return sourceInitializing.value();
247  }
248
249  @Override
250  public void init() {
251    rms.init();
252  }
253
254  @Override
255  public void setGauge(String gaugeName, long value) {
256    rms.setGauge(KEY_PREFIX + gaugeName, value);
257  }
258
259  @Override
260  public void incGauge(String gaugeName, long delta) {
261    rms.incGauge(KEY_PREFIX + gaugeName, delta);
262  }
263
264  @Override
265  public void decGauge(String gaugeName, long delta) {
266    rms.decGauge(KEY_PREFIX + gaugeName, delta);
267  }
268
269  @Override
270  public void removeMetric(String key) {
271    rms.removeMetric(KEY_PREFIX + key);
272  }
273
274  @Override
275  public void incCounters(String counterName, long delta) {
276    rms.incCounters(KEY_PREFIX + counterName, delta);
277  }
278
279  @Override
280  public void updateHistogram(String name, long value) {
281    rms.updateHistogram(KEY_PREFIX + name, value);
282  }
283
284  @Override
285  public String getMetricsContext() {
286    return rms.getMetricsContext();
287  }
288
289  @Override
290  public String getMetricsDescription() {
291    return rms.getMetricsDescription();
292  }
293
294  @Override
295  public String getMetricsJmxContext() {
296    return rms.getMetricsJmxContext();
297  }
298
299  @Override
300  public String getMetricsName() {
301    return rms.getMetricsName();
302  }
303
304  @Override
305  public long getWALEditsRead() {
306    return this.logReadInEditsCounter.value();
307  }
308
309  @Override
310  public long getShippedOps() {
311    return this.shippedOpsCounter.value();
312  }
313
314  @Override
315  public long getEditsFiltered() {
316    return this.walEditsFilteredCounter.value();
317  }
318
319  @Override
320  public void setWALReaderEditsBufferBytes(long usage) {
321    this.walReaderBufferUsageBytes.set(usage);
322  }
323
324  @Override
325  public long getWALReaderEditsBufferBytes() {
326    return this.walReaderBufferUsageBytes.value();
327  }
328}