View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25  import org.apache.hadoop.hbase.util.HasThread;
26  
27  import java.util.ConcurrentModificationException;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.Map;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.Delayed;
33  import java.util.concurrent.DelayQueue;
34  import java.util.concurrent.TimeUnit;
35  
36  import java.io.IOException;
37  
38  /**
39   * Leases
40   *
41   * There are several server classes in HBase that need to track external
42   * clients that occasionally send heartbeats.
43   *
44   * <p>These external clients hold resources in the server class.
45   * Those resources need to be released if the external client fails to send a
46   * heartbeat after some interval of time passes.
47   *
48   * <p>The Leases class is a general reusable class for this kind of pattern.
49   * An instance of the Leases class will create a thread to do its dirty work.
50   * You should close() the instance if you want to clean up the thread properly.
51   *
52   * <p>
53   * NOTE: This class extends Thread rather than Chore because the sleep time
54   * can be interrupted when there is something to do, rather than the Chore
55   * sleep time which is invariant.
56   */
57  @InterfaceAudience.Private
58  public class Leases extends HasThread {
59    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
60    public static final int MIN_WAIT_TIME = 100;
61    private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
62  
63    protected final int leaseCheckFrequency;
64    protected volatile boolean stopRequested = false;
65  
66    /**
67     * Creates a lease monitor
68     * 
69     * @param leaseCheckFrequency - how often the lease should be checked
70     *          (milliseconds)
71     */
72    public Leases(final int leaseCheckFrequency) {
73      this.leaseCheckFrequency = leaseCheckFrequency;
74      setDaemon(true);
75    }
76  
77    /**
78     * @see Thread#run()
79     */
80    @Override
81    public void run() {
82      long toWait = leaseCheckFrequency;
83      Lease nextLease = null;
84      long nextLeaseDelay = Long.MAX_VALUE;
85  
86      while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
87  
88        try {
89          if (nextLease != null) {
90            toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
91          }
92  
93          toWait = Math.min(leaseCheckFrequency, toWait);
94          toWait = Math.max(MIN_WAIT_TIME, toWait);
95  
96          Thread.sleep(toWait);
97        } catch (InterruptedException e) {
98          continue;
99        } catch (ConcurrentModificationException e) {
100         continue;
101       } catch (Throwable e) {
102         LOG.fatal("Unexpected exception killed leases thread", e);
103         break;
104       }
105 
106       nextLease = null;
107       nextLeaseDelay = Long.MAX_VALUE;
108       for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
109         Map.Entry<String, Lease> entry = it.next();
110         Lease lease = entry.getValue();
111         long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
112         if ( thisLeaseDelay > 0) {
113           if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
114             nextLease = lease;
115             nextLeaseDelay = thisLeaseDelay;
116           }
117         } else {
118           // A lease expired.  Run the expired code before removing from map
119           // since its presence in map is used to see if lease exists still.
120           if (lease.getListener() == null) {
121             LOG.error("lease listener is null for lease " + lease.getLeaseName());
122           } else {
123             lease.getListener().leaseExpired();
124           }
125           it.remove();
126         }
127       }
128     }
129     close();
130   }
131 
132   /**
133    * Shuts down this lease instance when all outstanding leases expire.
134    * Like {@link #close()} but rather than violently end all leases, waits
135    * first on extant leases to finish.  Use this method if the lease holders
136    * could lose data, leak locks, etc.  Presumes client has shutdown
137    * allocation of new leases.
138    */
139   public void closeAfterLeasesExpire() {
140     this.stopRequested = true;
141   }
142 
143   /**
144    * Shut down this Leases instance.  All pending leases will be destroyed,
145    * without any cancellation calls.
146    */
147   public void close() {
148     LOG.info(Thread.currentThread().getName() + " closing leases");
149     this.stopRequested = true;
150     leases.clear();
151     LOG.info(Thread.currentThread().getName() + " closed leases");
152   }
153 
154   /**
155    * Create a lease and insert it to the map of leases.
156    *
157    * @param leaseName name of the lease
158    * @param leaseTimeoutPeriod length of the lease in milliseconds
159    * @param listener listener that will process lease expirations
160    * @throws LeaseStillHeldException
161    */
162   public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
163       throws LeaseStillHeldException {
164     addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
165   }
166 
167   /**
168    * Inserts lease.  Resets expiration before insertion.
169    * @param lease
170    * @throws LeaseStillHeldException
171    */
172   public void addLease(final Lease lease) throws LeaseStillHeldException {
173     if (this.stopRequested) {
174       return;
175     }
176     if (leases.containsKey(lease.getLeaseName())) {
177       throw new LeaseStillHeldException(lease.getLeaseName());
178     }
179     lease.resetExpirationTime();
180     leases.put(lease.getLeaseName(), lease);
181   }
182 
183   /**
184    * Renew a lease
185    *
186    * @param leaseName name of lease
187    * @throws LeaseException
188    */
189   public void renewLease(final String leaseName) throws LeaseException {
190     if (this.stopRequested) {
191       return;
192     }
193     Lease lease = leases.get(leaseName);
194 
195     if (lease == null ) {
196       throw new LeaseException("lease '" + leaseName +
197           "' does not exist or has already expired");
198     }
199     lease.resetExpirationTime();
200   }
201 
202   /**
203    * Client explicitly cancels a lease.
204    * @param leaseName name of lease
205    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
206    */
207   public void cancelLease(final String leaseName) throws LeaseException {
208     removeLease(leaseName);
209   }
210 
211   /**
212    * Remove named lease.
213    * Lease is removed from the map of leases.
214    * Lease can be reinserted using {@link #addLease(Lease)}
215    *
216    * @param leaseName name of lease
217    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
218    * @return Removed lease
219    */
220   Lease removeLease(final String leaseName) throws LeaseException {
221     Lease lease = leases.remove(leaseName);
222     if (lease == null) {
223       throw new LeaseException("lease '" + leaseName + "' does not exist");
224     }
225     return lease;
226   }
227 
228   /**
229    * Thrown if we are asked to create a lease but lease on passed name already
230    * exists.
231    */
232   @SuppressWarnings("serial")
233   public static class LeaseStillHeldException extends IOException {
234     private final String leaseName;
235 
236     /**
237      * @param name
238      */
239     public LeaseStillHeldException(final String name) {
240       this.leaseName = name;
241     }
242 
243     /** @return name of lease */
244     public String getName() {
245       return this.leaseName;
246     }
247   }
248 
249   /** This class tracks a single Lease. */
250   static class Lease implements Delayed {
251     private final String leaseName;
252     private final LeaseListener listener;
253     private int leaseTimeoutPeriod;
254     private long expirationTime;
255 
256     Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
257       this.leaseName = leaseName;
258       this.listener = listener;
259       this.leaseTimeoutPeriod = leaseTimeoutPeriod;
260       this.expirationTime = 0;
261     }
262 
263     /** @return the lease name */
264     public String getLeaseName() {
265       return leaseName;
266     }
267 
268     /** @return listener */
269     public LeaseListener getListener() {
270       return this.listener;
271     }
272 
273     @Override
274     public boolean equals(Object obj) {
275       if (this == obj) {
276         return true;
277       }
278       if (obj == null) {
279         return false;
280       }
281       if (getClass() != obj.getClass()) {
282         return false;
283       }
284       return this.hashCode() == obj.hashCode();
285     }
286 
287     @Override
288     public int hashCode() {
289       return this.leaseName.hashCode();
290     }
291 
292     public long getDelay(TimeUnit unit) {
293       return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTimeMillis(),
294           TimeUnit.MILLISECONDS);
295     }
296 
297     public int compareTo(Delayed o) {
298       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
299         o.getDelay(TimeUnit.MILLISECONDS);
300 
301       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
302     }
303 
304     /**
305      * Resets the expiration time of the lease.
306      */
307     public void resetExpirationTime() {
308       this.expirationTime = EnvironmentEdgeManager.currentTimeMillis() + this.leaseTimeoutPeriod;
309     }
310   }
311 }