View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
30  
31  /**
32   * This class is for maintaining the various replication statistics for a source and publishing them
33   * through the metrics interfaces.
34   */
35  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
36  public class MetricsSource {
37  
38    private static final Log LOG = LogFactory.getLog(MetricsSource.class);
39  
40    // tracks last shipped timestamp for each wal group
41    private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
42    private int lastQueueSize = 0;
43    private String id;
44  
45    private final MetricsReplicationSourceSource singleSourceSource;
46    private final MetricsReplicationSourceSource globalSourceSource;
47  
48    /**
49     * Constructor used to register the metrics
50     *
51     * @param id Name of the source this class is monitoring
52     */
53    public MetricsSource(String id) {
54      this.id = id;
55      singleSourceSource =
56          CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
57              .getSource(id);
58      globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
59    }
60  
61    /**
62     * Set the age of the last edit that was shipped
63     * @param timestamp write time of the edit
64     * @param walGroup which group we are setting
65     */
66    public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
67      long age = EnvironmentEdgeManager.currentTime() - timestamp;
68      singleSourceSource.setLastShippedAge(age);
69      globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge()));
70      this.lastTimeStamps.put(walGroup, timestamp);
71    }
72  
73    /**
74     * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
75     * when replication fails and need to keep that metric accurate.
76     * @param walGroupId id of the group to update
77     */
78    public void refreshAgeOfLastShippedOp(String walGroupId) {
79      Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
80      if (lastTimestamp == null) {
81        this.lastTimeStamps.put(walGroupId, 0L);
82        lastTimestamp = 0L;
83      }
84      if (lastTimestamp > 0) {
85        setAgeOfLastShippedOp(lastTimestamp, walGroupId);
86      }
87    }
88  
89    /**
90     * Set the size of the log queue
91     *
92     * @param size the size.
93     */
94    public void setSizeOfLogQueue(int size) {
95      singleSourceSource.setSizeOfLogQueue(size);
96      globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize);
97      lastQueueSize = size;
98    }
99  
100   /**
101    * Add on the the number of log edits read
102    *
103    * @param delta the number of log edits read.
104    */
105   private void incrLogEditsRead(long delta) {
106     singleSourceSource.incrLogReadInEdits(delta);
107     globalSourceSource.incrLogReadInEdits(delta);
108   }
109 
110   /** Increment the number of log edits read by one. */
111   public void incrLogEditsRead() {
112     incrLogEditsRead(1);
113   }
114 
115   /**
116    * Add on the number of log edits filtered
117    *
118    * @param delta the number filtered.
119    */
120   public void incrLogEditsFiltered(long delta) {
121     singleSourceSource.incrLogEditsFiltered(delta);
122     globalSourceSource.incrLogEditsFiltered(delta);
123   }
124 
125   /** The number of log edits filtered out. */
126   public void incrLogEditsFiltered() {
127     incrLogEditsFiltered(1);
128   }
129 
130   /**
131    * Convience method to apply changes to metrics do to shipping a batch of logs.
132    *
133    * @param batchSize the size of the batch that was shipped to sinks.
134    */
135   public void shipBatch(long batchSize, int sizeInKB) {
136     singleSourceSource.incrBatchesShipped(1);
137     globalSourceSource.incrBatchesShipped(1);
138 
139     singleSourceSource.incrOpsShipped(batchSize);
140     globalSourceSource.incrOpsShipped(batchSize);
141 
142     singleSourceSource.incrShippedKBs(sizeInKB);
143     globalSourceSource.incrShippedKBs(sizeInKB);
144   }
145 
146   /** increase the byte number read by source from log file */
147   public void incrLogReadInBytes(long readInBytes) {
148     singleSourceSource.incrLogReadInBytes(readInBytes);
149     globalSourceSource.incrLogReadInBytes(readInBytes);
150   }
151 
152   /** Removes all metrics about this Source. */
153   public void clear() {
154     singleSourceSource.clear();
155     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
156     lastTimeStamps.clear();
157     lastQueueSize = 0;
158   }
159 
160   /**
161    * Get AgeOfLastShippedOp
162    * @return AgeOfLastShippedOp
163    */
164   public Long getAgeOfLastShippedOp() {
165     return singleSourceSource.getLastShippedAge();
166   }
167 
168   /**
169    * Get the sizeOfLogQueue
170    * @return sizeOfLogQueue
171    */
172   public int getSizeOfLogQueue() {
173     return this.lastQueueSize;
174   }
175 
176   /**
177    * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
178    * @return lastTimestampForAge
179    */
180   public long getTimeStampOfLastShippedOp() {
181     long lastTimestamp = 0L;
182     for (long ts : lastTimeStamps.values()) {
183       if (ts > lastTimestamp) {
184         lastTimestamp = ts;
185       }
186     }
187     return lastTimestamp;
188   }
189 
190   /**
191    * Get the slave peer ID
192    * @return peerID
193    */
194   public String getPeerID() {
195     return id;
196   }
197 
198   public void incrUnknownFileLengthForClosedWAL() {
199     singleSourceSource.incrUnknownFileLengthForClosedWAL();
200     globalSourceSource.incrUnknownFileLengthForClosedWAL();
201   }
202 
203   public void incrUncleanlyClosedWALs() {
204     singleSourceSource.incrUncleanlyClosedWALs();
205     globalSourceSource.incrUncleanlyClosedWALs();
206   }
207 
208   public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
209     singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
210     globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
211   }
212 
213   public void incrRestartedWALReading() {
214     singleSourceSource.incrRestartedWALReading();
215     globalSourceSource.incrRestartedWALReading();
216   }
217 
218   public void incrRepeatedFileBytes(final long bytes) {
219     singleSourceSource.incrRepeatedFileBytes(bytes);
220     globalSourceSource.incrRepeatedFileBytes(bytes);
221   }
222 
223   public void incrCompletedWAL() {
224     singleSourceSource.incrCompletedWAL();
225     globalSourceSource.incrCompletedWAL();
226   }
227 
228   public void incrCompletedRecoveryQueue() {
229     singleSourceSource.incrCompletedRecoveryQueue();
230     globalSourceSource.incrCompletedRecoveryQueue();
231   }
232 }