View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
30  
31  /**
32   * This class is for maintaining the various replication statistics for a source and publishing them
33   * through the metrics interfaces.
34   */
35  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
36  public class MetricsSource {
37  
38    private static final Log LOG = LogFactory.getLog(MetricsSource.class);
39  
40    // tracks last shipped timestamp for each wal group
41    private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
42    private int lastQueueSize = 0;
43    private long lastHFileRefsQueueSize = 0;
44    private String id;
45  
46    private final MetricsReplicationSourceSource singleSourceSource;
47    private final MetricsReplicationSourceSource globalSourceSource;
48  
49  
50    /**
51     * Constructor used to register the metrics
52     *
53     * @param id Name of the source this class is monitoring
54     */
55    public MetricsSource(String id) {
56      this.id = id;
57      singleSourceSource =
58          CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
59              .getSource(id);
60      globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
61    }
62  
63    /**
64     * Set the age of the last edit that was shipped
65     * @param timestamp write time of the edit
66     * @param walGroup which group we are setting
67     */
68    public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
69      long age = EnvironmentEdgeManager.currentTime() - timestamp;
70      singleSourceSource.setLastShippedAge(age);
71      globalSourceSource.setLastShippedAge(age);
72      this.lastTimeStamps.put(walGroup, timestamp);
73    }
74  
75    /**
76     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
77     * when replication fails and need to keep that metric accurate.
78     * @param walGroupId id of the group to update
79     */
80    public void refreshAgeOfLastShippedOp(String walGroupId) {
81      Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
82      if (lastTimestamp == null) {
83        this.lastTimeStamps.put(walGroupId, 0L);
84        lastTimestamp = 0L;
85      }
86      if (lastTimestamp > 0) {
87        setAgeOfLastShippedOp(lastTimestamp, walGroupId);
88      }
89    }
90  
91    /**
92     * Set the size of the log queue
93     *
94     * @param size the size.
95     */
96    public void setSizeOfLogQueue(int size) {
97      singleSourceSource.setSizeOfLogQueue(size);
98      globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize);
99      lastQueueSize = size;
100   }
101 
102   /**
103    * Add on the the number of log edits read
104    *
105    * @param delta the number of log edits read.
106    */
107   private void incrLogEditsRead(long delta) {
108     singleSourceSource.incrLogReadInEdits(delta);
109     globalSourceSource.incrLogReadInEdits(delta);
110   }
111 
112   /** Increment the number of log edits read by one. */
113   public void incrLogEditsRead() {
114     incrLogEditsRead(1);
115   }
116 
117   /**
118    * Add on the number of log edits filtered
119    *
120    * @param delta the number filtered.
121    */
122   public void incrLogEditsFiltered(long delta) {
123     singleSourceSource.incrLogEditsFiltered(delta);
124     globalSourceSource.incrLogEditsFiltered(delta);
125   }
126 
127   /** The number of log edits filtered out. */
128   public void incrLogEditsFiltered() {
129     incrLogEditsFiltered(1);
130   }
131 
132   /**
133    * Convience method to apply changes to metrics do to shipping a batch of logs.
134    *
135    * @param batchSize the size of the batch that was shipped to sinks.
136    */
137   public void shipBatch(long batchSize, int sizeInKB) {
138     singleSourceSource.incrBatchesShipped(1);
139     globalSourceSource.incrBatchesShipped(1);
140 
141     singleSourceSource.incrOpsShipped(batchSize);
142     globalSourceSource.incrOpsShipped(batchSize);
143 
144     singleSourceSource.incrShippedKBs(sizeInKB);
145     globalSourceSource.incrShippedKBs(sizeInKB);
146   }
147 
148   /**
149    * Convience method to apply changes to metrics do to shipping a batch of logs.
150    *
151    * @param batchSize the size of the batch that was shipped to sinks.
152    * @param hfiles total number of hfiles shipped to sinks.
153    */
154   public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
155     shipBatch(batchSize, sizeInKB);
156     singleSourceSource.incrHFilesShipped(hfiles);
157     globalSourceSource.incrHFilesShipped(hfiles);
158   }
159 
160   /** increase the byte number read by source from log file */
161   public void incrLogReadInBytes(long readInBytes) {
162     singleSourceSource.incrLogReadInBytes(readInBytes);
163     globalSourceSource.incrLogReadInBytes(readInBytes);
164   }
165 
166   /** Removes all metrics about this Source. */
167   public void clear() {
168     singleSourceSource.clear();
169     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
170     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
171     lastTimeStamps.clear();
172     lastQueueSize = 0;
173     lastHFileRefsQueueSize = 0;
174   }
175 
176   /**
177    * Get AgeOfLastShippedOp
178    * @return AgeOfLastShippedOp
179    */
180   public Long getAgeOfLastShippedOp() {
181     return singleSourceSource.getLastShippedAge();
182   }
183 
184   /**
185    * Get the sizeOfLogQueue
186    * @return sizeOfLogQueue
187    */
188   public int getSizeOfLogQueue() {
189     return this.lastQueueSize;
190   }
191 
192   /**
193    * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
194    * @return lastTimestampForAge
195    */
196   public long getTimeStampOfLastShippedOp() {
197     long lastTimestamp = 0L;
198     for (long ts : lastTimeStamps.values()) {
199       if (ts > lastTimestamp) {
200         lastTimestamp = ts;
201       }
202     }
203     return lastTimestamp;
204   }
205 
206   /**
207    * Get the slave peer ID
208    * @return peerID
209    */
210   public String getPeerID() {
211     return id;
212   }
213 
214   public void incrSizeOfHFileRefsQueue(long size) {
215     singleSourceSource.incrSizeOfHFileRefsQueue(size);
216     globalSourceSource.incrSizeOfHFileRefsQueue(size);
217     lastHFileRefsQueueSize = size;
218   }
219 
220   public void decrSizeOfHFileRefsQueue(int size) {
221     singleSourceSource.decrSizeOfHFileRefsQueue(size);
222     globalSourceSource.decrSizeOfHFileRefsQueue(size);
223     lastHFileRefsQueueSize -= size;
224     if (lastHFileRefsQueueSize < 0) {
225       lastHFileRefsQueueSize = 0;
226     }
227   }
228 }