View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25  import org.apache.hadoop.hbase.util.HasThread;
26  
27  import java.util.ConcurrentModificationException;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Delayed;
32  import java.util.concurrent.TimeUnit;
33  
34  import java.io.IOException;
35  
36  /**
37   * Leases
38   *
39   * There are several server classes in HBase that need to track external
40   * clients that occasionally send heartbeats.
41   *
42   * <p>These external clients hold resources in the server class.
43   * Those resources need to be released if the external client fails to send a
44   * heartbeat after some interval of time passes.
45   *
46   * <p>The Leases class is a general reusable class for this kind of pattern.
47   * An instance of the Leases class will create a thread to do its dirty work.
48   * You should close() the instance if you want to clean up the thread properly.
49   *
50   * <p>
51   * NOTE: This class extends Thread rather than Chore because the sleep time
52   * can be interrupted when there is something to do, rather than the Chore
53   * sleep time which is invariant.
54   */
55  @InterfaceAudience.Private
56  public class Leases extends HasThread {
57    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
58    public static final int MIN_WAIT_TIME = 100;
59    private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
60  
61    protected final int leaseCheckFrequency;
62    protected volatile boolean stopRequested = false;
63  
64    /**
65     * Creates a lease monitor
66     * 
67     * @param leaseCheckFrequency - how often the lease should be checked
68     *          (milliseconds)
69     */
70    public Leases(final int leaseCheckFrequency) {
71      super("RegionServerLeases");  // thread name
72      this.leaseCheckFrequency = leaseCheckFrequency;
73      setDaemon(true);
74    }
75  
76    /**
77     * @see Thread#run()
78     */
79    @Override
80    public void run() {
81      long toWait = leaseCheckFrequency;
82      Lease nextLease = null;
83      long nextLeaseDelay = Long.MAX_VALUE;
84  
85      while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
86  
87        try {
88          if (nextLease != null) {
89            toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
90          }
91  
92          toWait = Math.min(leaseCheckFrequency, toWait);
93          toWait = Math.max(MIN_WAIT_TIME, toWait);
94  
95          Thread.sleep(toWait);
96        } catch (InterruptedException e) {
97          continue;
98        } catch (ConcurrentModificationException e) {
99          continue;
100       } catch (Throwable e) {
101         LOG.fatal("Unexpected exception killed leases thread", e);
102         break;
103       }
104 
105       nextLease = null;
106       nextLeaseDelay = Long.MAX_VALUE;
107       for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
108         Map.Entry<String, Lease> entry = it.next();
109         Lease lease = entry.getValue();
110         long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
111         if ( thisLeaseDelay > 0) {
112           if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
113             nextLease = lease;
114             nextLeaseDelay = thisLeaseDelay;
115           }
116         } else {
117           // A lease expired.  Run the expired code before removing from map
118           // since its presence in map is used to see if lease exists still.
119           if (lease.getListener() == null) {
120             LOG.error("lease listener is null for lease " + lease.getLeaseName());
121           } else {
122             lease.getListener().leaseExpired();
123           }
124           it.remove();
125         }
126       }
127     }
128     close();
129   }
130 
131   /**
132    * Shuts down this lease instance when all outstanding leases expire.
133    * Like {@link #close()} but rather than violently end all leases, waits
134    * first on extant leases to finish.  Use this method if the lease holders
135    * could lose data, leak locks, etc.  Presumes client has shutdown
136    * allocation of new leases.
137    */
138   public void closeAfterLeasesExpire() {
139     this.stopRequested = true;
140   }
141 
142   /**
143    * Shut down this Leases instance.  All pending leases will be destroyed,
144    * without any cancellation calls.
145    */
146   public void close() {
147     LOG.info(Thread.currentThread().getName() + " closing leases");
148     this.stopRequested = true;
149     leases.clear();
150     LOG.info(Thread.currentThread().getName() + " closed leases");
151   }
152 
153   /**
154    * Create a lease and insert it to the map of leases.
155    *
156    * @param leaseName name of the lease
157    * @param leaseTimeoutPeriod length of the lease in milliseconds
158    * @param listener listener that will process lease expirations
159    * @throws LeaseStillHeldException
160    */
161   public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162       throws LeaseStillHeldException {
163     addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
164   }
165 
166   /**
167    * Inserts lease.  Resets expiration before insertion.
168    * @param lease
169    * @throws LeaseStillHeldException
170    */
171   public void addLease(final Lease lease) throws LeaseStillHeldException {
172     if (this.stopRequested) {
173       return;
174     }
175     if (leases.containsKey(lease.getLeaseName())) {
176       throw new LeaseStillHeldException(lease.getLeaseName());
177     }
178     lease.resetExpirationTime();
179     leases.put(lease.getLeaseName(), lease);
180   }
181 
182   /**
183    * Renew a lease
184    *
185    * @param leaseName name of lease
186    * @throws LeaseException
187    */
188   public void renewLease(final String leaseName) throws LeaseException {
189     if (this.stopRequested) {
190       return;
191     }
192     Lease lease = leases.get(leaseName);
193 
194     if (lease == null ) {
195       throw new LeaseException("lease '" + leaseName +
196           "' does not exist or has already expired");
197     }
198     lease.resetExpirationTime();
199   }
200 
201   /**
202    * Client explicitly cancels a lease.
203    * @param leaseName name of lease
204    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
205    */
206   public void cancelLease(final String leaseName) throws LeaseException {
207     removeLease(leaseName);
208   }
209 
210   /**
211    * Remove named lease.
212    * Lease is removed from the map of leases.
213    * Lease can be reinserted using {@link #addLease(Lease)}
214    *
215    * @param leaseName name of lease
216    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
217    * @return Removed lease
218    */
219   Lease removeLease(final String leaseName) throws LeaseException {
220     Lease lease = leases.remove(leaseName);
221     if (lease == null) {
222       throw new LeaseException("lease '" + leaseName + "' does not exist");
223     }
224     return lease;
225   }
226 
227   /**
228    * Thrown if we are asked to create a lease but lease on passed name already
229    * exists.
230    */
231   @SuppressWarnings("serial")
232   public static class LeaseStillHeldException extends IOException {
233     private final String leaseName;
234 
235     /**
236      * @param name
237      */
238     public LeaseStillHeldException(final String name) {
239       this.leaseName = name;
240     }
241 
242     /** @return name of lease */
243     public String getName() {
244       return this.leaseName;
245     }
246   }
247 
248   /** This class tracks a single Lease. */
249   static class Lease implements Delayed {
250     private final String leaseName;
251     private final LeaseListener listener;
252     private int leaseTimeoutPeriod;
253     private long expirationTime;
254 
255     Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
256       this.leaseName = leaseName;
257       this.listener = listener;
258       this.leaseTimeoutPeriod = leaseTimeoutPeriod;
259       this.expirationTime = 0;
260     }
261 
262     /** @return the lease name */
263     public String getLeaseName() {
264       return leaseName;
265     }
266 
267     /** @return listener */
268     public LeaseListener getListener() {
269       return this.listener;
270     }
271 
272     @Override
273     public boolean equals(Object obj) {
274       if (this == obj) {
275         return true;
276       }
277       if (obj == null) {
278         return false;
279       }
280       if (getClass() != obj.getClass()) {
281         return false;
282       }
283       return this.hashCode() == obj.hashCode();
284     }
285 
286     @Override
287     public int hashCode() {
288       return this.leaseName.hashCode();
289     }
290 
291     public long getDelay(TimeUnit unit) {
292       return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
293           TimeUnit.MILLISECONDS);
294     }
295 
296     public int compareTo(Delayed o) {
297       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
298         o.getDelay(TimeUnit.MILLISECONDS);
299 
300       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
301     }
302 
303     /**
304      * Resets the expiration time of the lease.
305      */
306     public void resetExpirationTime() {
307       this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
308     }
309   }
310 }