View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
25  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
26  
27  /**
28   * This class is for maintaining the various replication statistics for a source and publishing them
29   * through the metrics interfaces.
30   */
31  @InterfaceAudience.Private
32  public class MetricsSource {
33  
34    public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
35    public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
36    public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
37    public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
38    public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
39    public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
40    public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
41    public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
42  
43    public static final Log LOG = LogFactory.getLog(MetricsSource.class);
44    private String id;
45  
46    private long lastTimestamp = 0;
47    private int lastQueueSize = 0;
48  
49    private String sizeOfLogQueKey;
50    private String ageOfLastShippedOpKey;
51    private String logEditsReadKey;
52    private String logEditsFilteredKey;
53    private final String shippedBatchesKey;
54    private final String shippedOpsKey;
55    private final String shippedKBsKey;
56    private final String logReadInBytesKey;
57  
58    private MetricsReplicationSource rms;
59  
60    /**
61     * Constructor used to register the metrics
62     *
63     * @param id Name of the source this class is monitoring
64     */
65    public MetricsSource(String id) {
66      this.id = id;
67  
68      sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
69      ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
70      logEditsReadKey = "source." + id + ".logEditsRead";
71      logEditsFilteredKey = "source." + id + ".logEditsFiltered";
72      shippedBatchesKey = "source." + this.id + ".shippedBatches";
73      shippedOpsKey = "source." + this.id + ".shippedOps";
74      shippedKBsKey = "source." + this.id + ".shippedKBs";
75      logReadInBytesKey = "source." + this.id + ".logReadInBytes";
76      rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
77    }
78  
79    /**
80     * Set the age of the last edit that was shipped
81     *
82     * @param timestamp write time of the edit
83     */
84    public void setAgeOfLastShippedOp(long timestamp) {
85      long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp;
86      rms.setGauge(ageOfLastShippedOpKey, age);
87      rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
88      this.lastTimestamp = timestamp;
89    }
90  
91    /**
92     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
93     * when replication fails and need to keep that metric accurate.
94     */
95    public void refreshAgeOfLastShippedOp() {
96      if (this.lastTimestamp > 0) {
97        setAgeOfLastShippedOp(this.lastTimestamp);
98      }
99    }
100 
101   /**
102    * Set the size of the log queue
103    *
104    * @param size the size.
105    */
106   public void setSizeOfLogQueue(int size) {
107     rms.setGauge(sizeOfLogQueKey, size);
108     rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
109     lastQueueSize = size;
110   }
111 
112   /**
113    * Add on the the number of log edits read
114    *
115    * @param delta the number of log edits read.
116    */
117   private void incrLogEditsRead(long delta) {
118     rms.incCounters(logEditsReadKey, delta);
119     rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
120   }
121 
122   /** Increment the number of log edits read by one. */
123   public void incrLogEditsRead() {
124     incrLogEditsRead(1);
125   }
126 
127   /**
128    * Add on the number of log edits filtered
129    *
130    * @param delta the number filtered.
131    */
132   private void incrLogEditsFiltered(long delta) {
133     rms.incCounters(logEditsFilteredKey, delta);
134     rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
135   }
136 
137   /** The number of log edits filtered out. */
138   public void incrLogEditsFiltered() {
139     incrLogEditsFiltered(1);
140   }
141 
142   /**
143    * Convience method to apply changes to metrics do to shipping a batch of logs.
144    *
145    * @param batchSize the size of the batch that was shipped to sinks.
146    */
147   public void shipBatch(long batchSize, int sizeInKB) {
148     rms.incCounters(shippedBatchesKey, 1);
149     rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
150     rms.incCounters(shippedOpsKey, batchSize);
151     rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
152     rms.incCounters(shippedKBsKey, sizeInKB);
153     rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
154   }
155   
156   /** increase the byte number read by source from log file */
157   public void incrLogReadInBytes(long readInBytes) {
158     rms.incCounters(logReadInBytesKey, readInBytes);
159     rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes);
160   }
161 
162   /** Removes all metrics about this Source. */
163   public void clear() {
164     rms.removeMetric(sizeOfLogQueKey);
165     rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
166     lastQueueSize = 0;
167     rms.removeMetric(ageOfLastShippedOpKey);
168 
169     rms.removeMetric(logEditsFilteredKey);
170     rms.removeMetric(logEditsReadKey);
171 
172   }
173 }