View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
25  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
26  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
27  
28  /**
29   * This class is for maintaining the various replication statistics for a source and publishing them
30   * through the metrics interfaces.
31   */
32  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
33  public class MetricsSource {
34  
35    public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
36    public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
37    public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
38    public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
39    public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
40    public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
41    public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
42    public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
43  
44    public static final Log LOG = LogFactory.getLog(MetricsSource.class);
45    private String id;
46  
47    private long lastTimestamp = 0;
48    private int lastQueueSize = 0;
49  
50    private String sizeOfLogQueKey;
51    private String ageOfLastShippedOpKey;
52    private String logEditsReadKey;
53    private String logEditsFilteredKey;
54    private final String shippedBatchesKey;
55    private final String shippedOpsKey;
56    private final String shippedKBsKey;
57    private final String logReadInBytesKey;
58  
59    private MetricsReplicationSource rms;
60  
61    /**
62     * Constructor used to register the metrics
63     *
64     * @param id Name of the source this class is monitoring
65     */
66    public MetricsSource(String id) {
67      this.id = id;
68  
69      sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
70      ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
71      logEditsReadKey = "source." + id + ".logEditsRead";
72      logEditsFilteredKey = "source." + id + ".logEditsFiltered";
73      shippedBatchesKey = "source." + this.id + ".shippedBatches";
74      shippedOpsKey = "source." + this.id + ".shippedOps";
75      shippedKBsKey = "source." + this.id + ".shippedKBs";
76      logReadInBytesKey = "source." + this.id + ".logReadInBytes";
77      rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
78    }
79  
80    /**
81     * Set the age of the last edit that was shipped
82     *
83     * @param timestamp write time of the edit
84     */
85    public void setAgeOfLastShippedOp(long timestamp) {
86      long age = EnvironmentEdgeManager.currentTime() - timestamp;
87      rms.setGauge(ageOfLastShippedOpKey, age);
88      rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
89      this.lastTimestamp = timestamp;
90    }
91  
92    /**
93     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
94     * when replication fails and need to keep that metric accurate.
95     */
96    public void refreshAgeOfLastShippedOp() {
97      if (this.lastTimestamp > 0) {
98        setAgeOfLastShippedOp(this.lastTimestamp);
99      }
100   }
101 
102   /**
103    * Set the size of the log queue
104    *
105    * @param size the size.
106    */
107   public void setSizeOfLogQueue(int size) {
108     rms.setGauge(sizeOfLogQueKey, size);
109     rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
110     lastQueueSize = size;
111   }
112 
113   /**
114    * Add on the the number of log edits read
115    *
116    * @param delta the number of log edits read.
117    */
118   private void incrLogEditsRead(long delta) {
119     rms.incCounters(logEditsReadKey, delta);
120     rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
121   }
122 
123   /** Increment the number of log edits read by one. */
124   public void incrLogEditsRead() {
125     incrLogEditsRead(1);
126   }
127 
128   /**
129    * Add on the number of log edits filtered
130    *
131    * @param delta the number filtered.
132    */
133   private void incrLogEditsFiltered(long delta) {
134     rms.incCounters(logEditsFilteredKey, delta);
135     rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
136   }
137 
138   /** The number of log edits filtered out. */
139   public void incrLogEditsFiltered() {
140     incrLogEditsFiltered(1);
141   }
142 
143   /**
144    * Convience method to apply changes to metrics do to shipping a batch of logs.
145    *
146    * @param batchSize the size of the batch that was shipped to sinks.
147    */
148   public void shipBatch(long batchSize, int sizeInKB) {
149     rms.incCounters(shippedBatchesKey, 1);
150     rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
151     rms.incCounters(shippedOpsKey, batchSize);
152     rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
153     rms.incCounters(shippedKBsKey, sizeInKB);
154     rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
155   }
156 
157   /** increase the byte number read by source from log file */
158   public void incrLogReadInBytes(long readInBytes) {
159     rms.incCounters(logReadInBytesKey, readInBytes);
160     rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes);
161   }
162 
163   /** Removes all metrics about this Source. */
164   public void clear() {
165     rms.removeMetric(sizeOfLogQueKey);
166     rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
167     lastQueueSize = 0;
168     rms.removeMetric(ageOfLastShippedOpKey);
169 
170     rms.removeMetric(logEditsFilteredKey);
171     rms.removeMetric(logEditsReadKey);
172 
173   }
174 }