View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25  import org.apache.hadoop.hbase.util.HasThread;
26  
27  import java.util.ConcurrentModificationException;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Delayed;
32  import java.util.concurrent.TimeUnit;
33  
34  import java.io.IOException;
35  
36  /**
37   * Leases
38   *
39   * There are several server classes in HBase that need to track external
40   * clients that occasionally send heartbeats.
41   *
42   * <p>These external clients hold resources in the server class.
43   * Those resources need to be released if the external client fails to send a
44   * heartbeat after some interval of time passes.
45   *
46   * <p>The Leases class is a general reusable class for this kind of pattern.
47   * An instance of the Leases class will create a thread to do its dirty work.
48   * You should close() the instance if you want to clean up the thread properly.
49   *
50   * <p>
51   * NOTE: This class extends Thread rather than Chore because the sleep time
52   * can be interrupted when there is something to do, rather than the Chore
53   * sleep time which is invariant.
54   */
55  @InterfaceAudience.Private
56  public class Leases extends HasThread {
57    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
58    public static final int MIN_WAIT_TIME = 100;
59    private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
60  
61    protected final int leaseCheckFrequency;
62    protected volatile boolean stopRequested = false;
63  
64    /**
65     * Creates a lease monitor
66     * 
67     * @param leaseCheckFrequency - how often the lease should be checked
68     *          (milliseconds)
69     */
70    public Leases(final int leaseCheckFrequency) {
71      this.leaseCheckFrequency = leaseCheckFrequency;
72      setDaemon(true);
73    }
74  
75    /**
76     * @see Thread#run()
77     */
78    @Override
79    public void run() {
80      long toWait = leaseCheckFrequency;
81      Lease nextLease = null;
82      long nextLeaseDelay = Long.MAX_VALUE;
83  
84      while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
85  
86        try {
87          if (nextLease != null) {
88            toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
89          }
90  
91          toWait = Math.min(leaseCheckFrequency, toWait);
92          toWait = Math.max(MIN_WAIT_TIME, toWait);
93  
94          Thread.sleep(toWait);
95        } catch (InterruptedException e) {
96          continue;
97        } catch (ConcurrentModificationException e) {
98          continue;
99        } catch (Throwable e) {
100         LOG.fatal("Unexpected exception killed leases thread", e);
101         break;
102       }
103 
104       nextLease = null;
105       nextLeaseDelay = Long.MAX_VALUE;
106       for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
107         Map.Entry<String, Lease> entry = it.next();
108         Lease lease = entry.getValue();
109         long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
110         if ( thisLeaseDelay > 0) {
111           if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
112             nextLease = lease;
113             nextLeaseDelay = thisLeaseDelay;
114           }
115         } else {
116           // A lease expired.  Run the expired code before removing from map
117           // since its presence in map is used to see if lease exists still.
118           if (lease.getListener() == null) {
119             LOG.error("lease listener is null for lease " + lease.getLeaseName());
120           } else {
121             lease.getListener().leaseExpired();
122           }
123           it.remove();
124         }
125       }
126     }
127     close();
128   }
129 
130   /**
131    * Shuts down this lease instance when all outstanding leases expire.
132    * Like {@link #close()} but rather than violently end all leases, waits
133    * first on extant leases to finish.  Use this method if the lease holders
134    * could lose data, leak locks, etc.  Presumes client has shutdown
135    * allocation of new leases.
136    */
137   public void closeAfterLeasesExpire() {
138     this.stopRequested = true;
139   }
140 
141   /**
142    * Shut down this Leases instance.  All pending leases will be destroyed,
143    * without any cancellation calls.
144    */
145   public void close() {
146     LOG.info(Thread.currentThread().getName() + " closing leases");
147     this.stopRequested = true;
148     leases.clear();
149     LOG.info(Thread.currentThread().getName() + " closed leases");
150   }
151 
152   /**
153    * Create a lease and insert it to the map of leases.
154    *
155    * @param leaseName name of the lease
156    * @param leaseTimeoutPeriod length of the lease in milliseconds
157    * @param listener listener that will process lease expirations
158    * @return The lease created.
159    * @throws LeaseStillHeldException
160    */
161   public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162       throws LeaseStillHeldException {
163     Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
164     addLease(lease);
165     return lease;
166   }
167 
168   /**
169    * Inserts lease.  Resets expiration before insertion.
170    * @param lease
171    * @throws LeaseStillHeldException
172    */
173   public void addLease(final Lease lease) throws LeaseStillHeldException {
174     if (this.stopRequested) {
175       return;
176     }
177     if (leases.containsKey(lease.getLeaseName())) {
178       throw new LeaseStillHeldException(lease.getLeaseName());
179     }
180     lease.resetExpirationTime();
181     leases.put(lease.getLeaseName(), lease);
182   }
183 
184   /**
185    * Renew a lease
186    *
187    * @param leaseName name of lease
188    * @throws LeaseException
189    */
190   public void renewLease(final String leaseName) throws LeaseException {
191     if (this.stopRequested) {
192       return;
193     }
194     Lease lease = leases.get(leaseName);
195 
196     if (lease == null ) {
197       throw new LeaseException("lease '" + leaseName +
198           "' does not exist or has already expired");
199     }
200     lease.resetExpirationTime();
201   }
202 
203   /**
204    * Client explicitly cancels a lease.
205    * @param leaseName name of lease
206    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
207    */
208   public void cancelLease(final String leaseName) throws LeaseException {
209     removeLease(leaseName);
210   }
211 
212   /**
213    * Remove named lease.
214    * Lease is removed from the map of leases.
215    * Lease can be reinserted using {@link #addLease(Lease)}
216    *
217    * @param leaseName name of lease
218    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
219    * @return Removed lease
220    */
221   Lease removeLease(final String leaseName) throws LeaseException {
222     Lease lease = leases.remove(leaseName);
223     if (lease == null) {
224       throw new LeaseException("lease '" + leaseName + "' does not exist");
225     }
226     return lease;
227   }
228 
229   /**
230    * Thrown if we are asked to create a lease but lease on passed name already
231    * exists.
232    */
233   @SuppressWarnings("serial")
234   public static class LeaseStillHeldException extends IOException {
235     private final String leaseName;
236 
237     /**
238      * @param name
239      */
240     public LeaseStillHeldException(final String name) {
241       this.leaseName = name;
242     }
243 
244     /** @return name of lease */
245     public String getName() {
246       return this.leaseName;
247     }
248   }
249 
250   /** This class tracks a single Lease. */
251   static class Lease implements Delayed {
252     private final String leaseName;
253     private final LeaseListener listener;
254     private int leaseTimeoutPeriod;
255     private long expirationTime;
256 
257     Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
258       this.leaseName = leaseName;
259       this.listener = listener;
260       this.leaseTimeoutPeriod = leaseTimeoutPeriod;
261       this.expirationTime = 0;
262     }
263 
264     /** @return the lease name */
265     public String getLeaseName() {
266       return leaseName;
267     }
268 
269     /** @return listener */
270     public LeaseListener getListener() {
271       return this.listener;
272     }
273 
274     @Override
275     public boolean equals(Object obj) {
276       if (this == obj) {
277         return true;
278       }
279       if (obj == null) {
280         return false;
281       }
282       if (getClass() != obj.getClass()) {
283         return false;
284       }
285       return this.hashCode() == obj.hashCode();
286     }
287 
288     @Override
289     public int hashCode() {
290       return this.leaseName.hashCode();
291     }
292 
293     public long getDelay(TimeUnit unit) {
294       return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
295           TimeUnit.MILLISECONDS);
296     }
297 
298     public int compareTo(Delayed o) {
299       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
300         o.getDelay(TimeUnit.MILLISECONDS);
301 
302       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
303     }
304 
305     /**
306      * Resets the expiration time of the lease.
307      */
308     public void resetExpirationTime() {
309       this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
310     }
311   }
312 }