View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
30  
31  /**
32   * This class is for maintaining the various replication statistics for a source and publishing them
33   * through the metrics interfaces.
34   */
35  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
36  public class MetricsSource {
37  
38    private static final Log LOG = LogFactory.getLog(MetricsSource.class);
39  
40    // tracks last shipped timestamp for each wal group
41    private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
42    private int lastQueueSize = 0;
43    private long lastHFileRefsQueueSize = 0;
44    private String id;
45  
46    private final MetricsReplicationSourceSource singleSourceSource;
47    private final MetricsReplicationSourceSource globalSourceSource;
48  
49  
50    /**
51     * Constructor used to register the metrics
52     *
53     * @param id Name of the source this class is monitoring
54     */
55    public MetricsSource(String id) {
56      this.id = id;
57      singleSourceSource =
58          CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
59              .getSource(id);
60      globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
61    }
62  
63    /**
64     * Set the age of the last edit that was shipped
65     * @param timestamp write time of the edit
66     * @param walGroup which group we are setting
67     */
68    public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
69      long age = EnvironmentEdgeManager.currentTime() - timestamp;
70      singleSourceSource.setLastShippedAge(age);
71      globalSourceSource.setLastShippedAge(age);
72      this.lastTimeStamps.put(walGroup, timestamp);
73    }
74  
75    /**
76     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
77     * when replication fails and need to keep that metric accurate.
78     * @param walGroupId id of the group to update
79     */
80    public void refreshAgeOfLastShippedOp(String walGroupId) {
81      Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
82      if (lastTimestamp == null) {
83        this.lastTimeStamps.put(walGroupId, 0L);
84        lastTimestamp = 0L;
85      }
86      if (lastTimestamp > 0) {
87        setAgeOfLastShippedOp(lastTimestamp, walGroupId);
88      }
89    }
90  
91    /**
92     * Increment size of the log queue.
93     */
94    public void incrSizeOfLogQueue() {
95      singleSourceSource.incrSizeOfLogQueue(1);
96      globalSourceSource.incrSizeOfLogQueue(1);
97    }
98  
99    public void decrSizeOfLogQueue() {
100     singleSourceSource.decrSizeOfLogQueue(1);
101     globalSourceSource.decrSizeOfLogQueue(1);
102   }
103 
104   /**
105    * Add on the the number of log edits read
106    *
107    * @param delta the number of log edits read.
108    */
109   private void incrLogEditsRead(long delta) {
110     singleSourceSource.incrLogReadInEdits(delta);
111     globalSourceSource.incrLogReadInEdits(delta);
112   }
113 
114   /** Increment the number of log edits read by one. */
115   public void incrLogEditsRead() {
116     incrLogEditsRead(1);
117   }
118 
119   /**
120    * Add on the number of log edits filtered
121    *
122    * @param delta the number filtered.
123    */
124   public void incrLogEditsFiltered(long delta) {
125     singleSourceSource.incrLogEditsFiltered(delta);
126     globalSourceSource.incrLogEditsFiltered(delta);
127   }
128 
129   /** The number of log edits filtered out. */
130   public void incrLogEditsFiltered() {
131     incrLogEditsFiltered(1);
132   }
133 
134   /**
135    * Convience method to apply changes to metrics do to shipping a batch of logs.
136    *
137    * @param batchSize the size of the batch that was shipped to sinks.
138    */
139   public void shipBatch(long batchSize, int sizeInKB) {
140     singleSourceSource.incrBatchesShipped(1);
141     globalSourceSource.incrBatchesShipped(1);
142 
143     singleSourceSource.incrOpsShipped(batchSize);
144     globalSourceSource.incrOpsShipped(batchSize);
145 
146     singleSourceSource.incrShippedKBs(sizeInKB);
147     globalSourceSource.incrShippedKBs(sizeInKB);
148   }
149 
150   /**
151    * Convience method to apply changes to metrics do to shipping a batch of logs.
152    *
153    * @param batchSize the size of the batch that was shipped to sinks.
154    * @param hfiles total number of hfiles shipped to sinks.
155    */
156   public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
157     shipBatch(batchSize, sizeInKB);
158     singleSourceSource.incrHFilesShipped(hfiles);
159     globalSourceSource.incrHFilesShipped(hfiles);
160   }
161 
162   /** increase the byte number read by source from log file */
163   public void incrLogReadInBytes(long readInBytes) {
164     singleSourceSource.incrLogReadInBytes(readInBytes);
165     globalSourceSource.incrLogReadInBytes(readInBytes);
166   }
167 
168   /** Removes all metrics about this Source. */
169   public void clear() {
170     singleSourceSource.clear();
171     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
172     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
173     lastTimeStamps.clear();
174     lastQueueSize = 0;
175     lastHFileRefsQueueSize = 0;
176   }
177 
178   /**
179    * Get AgeOfLastShippedOp
180    * @return AgeOfLastShippedOp
181    */
182   public Long getAgeOfLastShippedOp() {
183     return singleSourceSource.getLastShippedAge();
184   }
185 
186   /**
187    * Get the sizeOfLogQueue
188    * @return sizeOfLogQueue
189    */
190   public int getSizeOfLogQueue() {
191     return singleSourceSource.getSizeOfLogQueue();
192   }
193 
194   /**
195    * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
196    * @return lastTimestampForAge
197    */
198   public long getTimeStampOfLastShippedOp() {
199     long lastTimestamp = 0L;
200     for (long ts : lastTimeStamps.values()) {
201       if (ts > lastTimestamp) {
202         lastTimestamp = ts;
203       }
204     }
205     return lastTimestamp;
206   }
207 
208   /**
209    * Get the slave peer ID
210    * @return peerID
211    */
212   public String getPeerID() {
213     return id;
214   }
215 
216   public void incrSizeOfHFileRefsQueue(long size) {
217     singleSourceSource.incrSizeOfHFileRefsQueue(size);
218     globalSourceSource.incrSizeOfHFileRefsQueue(size);
219     lastHFileRefsQueueSize = size;
220   }
221 
222   public void decrSizeOfHFileRefsQueue(int size) {
223     singleSourceSource.decrSizeOfHFileRefsQueue(size);
224     globalSourceSource.decrSizeOfHFileRefsQueue(size);
225     lastHFileRefsQueueSize -= size;
226     if (lastHFileRefsQueueSize < 0) {
227       lastHFileRefsQueueSize = 0;
228     }
229   }
230 }