View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
25  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
26  
27  /**
28   * This class is for maintaining the various replication statistics for a source and publishing them
29   * through the metrics interfaces.
30   */
31  @InterfaceAudience.Private
32  public class MetricsSource {
33  
34    public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
35    public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
36    public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
37    public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
38    public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
39    public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
40    public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
41  
42    public static final Log LOG = LogFactory.getLog(MetricsSource.class);
43    private String id;
44  
45    private long lastTimestamp = 0;
46    private int lastQueueSize = 0;
47  
48    private String sizeOfLogQueKey;
49    private String ageOfLastShippedOpKey;
50    private String logEditsReadKey;
51    private String logEditsFilteredKey;
52    private final String shippedBatchesKey;
53    private final String shippedOpsKey;
54    private final String logReadInBytesKey;
55  
56    private MetricsReplicationSource rms;
57  
58    /**
59     * Constructor used to register the metrics
60     *
61     * @param id Name of the source this class is monitoring
62     */
63    public MetricsSource(String id) {
64      this.id = id;
65  
66      sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
67      ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
68      logEditsReadKey = "source." + id + ".logEditsRead";
69      logEditsFilteredKey = "source." + id + ".logEditsFiltered";
70      shippedBatchesKey = "source." + this.id + ".shippedBatches";
71      shippedOpsKey = "source." + this.id + ".shippedOps";
72      logReadInBytesKey = "source." + this.id + ".logReadInBytes";
73      rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
74    }
75  
76    /**
77     * Set the age of the last edit that was shipped
78     *
79     * @param timestamp write time of the edit
80     */
81    public void setAgeOfLastShippedOp(long timestamp) {
82      long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp;
83      rms.setGauge(ageOfLastShippedOpKey, age);
84      rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
85      this.lastTimestamp = timestamp;
86    }
87  
88    /**
89     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
90     * when replication fails and need to keep that metric accurate.
91     */
92    public void refreshAgeOfLastShippedOp() {
93      if (this.lastTimestamp > 0) {
94        setAgeOfLastShippedOp(this.lastTimestamp);
95      }
96    }
97  
98    /**
99     * Set the size of the log queue
100    *
101    * @param size the size.
102    */
103   public void setSizeOfLogQueue(int size) {
104     rms.setGauge(sizeOfLogQueKey, size);
105     rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
106     lastQueueSize = size;
107   }
108 
109   /**
110    * Add on the the number of log edits read
111    *
112    * @param delta the number of log edits read.
113    */
114   private void incrLogEditsRead(long delta) {
115     rms.incCounters(logEditsReadKey, delta);
116     rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
117   }
118 
119   /** Increment the number of log edits read by one. */
120   public void incrLogEditsRead() {
121     incrLogEditsRead(1);
122   }
123 
124   /**
125    * Add on the number of log edits filtered
126    *
127    * @param delta the number filtered.
128    */
129   private void incrLogEditsFiltered(long delta) {
130     rms.incCounters(logEditsFilteredKey, delta);
131     rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
132   }
133 
134   /** The number of log edits filtered out. */
135   public void incrLogEditsFiltered() {
136     incrLogEditsFiltered(1);
137   }
138 
139   /**
140    * Convience method to apply changes to metrics do to shipping a batch of logs.
141    *
142    * @param batchSize the size of the batch that was shipped to sinks.
143    */
144   public void shipBatch(long batchSize) {
145     rms.incCounters(shippedBatchesKey, 1);
146     rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
147     rms.incCounters(shippedOpsKey, batchSize);
148     rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
149   }
150   
151   /** increase the byte number read by source from log file */
152   public void incrLogReadInBytes(long readInBytes) {
153     rms.incCounters(logReadInBytesKey, readInBytes);
154     rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes);
155   }
156 
157   /** Removes all metrics about this Source. */
158   public void clear() {
159     rms.removeMetric(sizeOfLogQueKey);
160     rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
161     lastQueueSize = 0;
162     rms.removeMetric(ageOfLastShippedOpKey);
163 
164     rms.removeMetric(logEditsFilteredKey);
165     rms.removeMetric(logEditsReadKey);
166 
167   }
168 }