001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ConcurrentModificationException;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.Delayed;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Leases
037 *
038 * There are several server classes in HBase that need to track external
039 * clients that occasionally send heartbeats.
040 *
041 * <p>These external clients hold resources in the server class.
042 * Those resources need to be released if the external client fails to send a
043 * heartbeat after some interval of time passes.
044 *
045 * <p>The Leases class is a general reusable class for this kind of pattern.
046 * An instance of the Leases class will create a thread to do its dirty work.
047 * You should close() the instance if you want to clean up the thread properly.
048 *
049 * <p>
050 * NOTE: This class extends Thread rather than Chore because the sleep time
051 * can be interrupted when there is something to do, rather than the Chore
052 * sleep time which is invariant.
053 */
054@InterfaceAudience.Private
055public class LeaseManager extends Thread {
056  private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName());
057  private static final int MIN_WAIT_TIME = 100;
058
059  private final Map<String, Lease> leases = new ConcurrentHashMap<>();
060  private final int leaseCheckFrequency;
061  private volatile boolean stopRequested = false;
062
063  /**
064   * Creates a lease manager.
065   *
066   * @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
067   */
068  public LeaseManager(final int leaseCheckFrequency) {
069    super("RegionServer.LeaseManager");  // thread name
070    this.leaseCheckFrequency = leaseCheckFrequency;
071    setDaemon(true);
072  }
073
074  @Override
075  public void run() {
076    long toWait = leaseCheckFrequency;
077    Lease nextLease = null;
078    long nextLeaseDelay = Long.MAX_VALUE;
079
080    while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
081
082      try {
083        if (nextLease != null) {
084          toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
085        }
086
087        toWait = Math.min(leaseCheckFrequency, toWait);
088        toWait = Math.max(MIN_WAIT_TIME, toWait);
089
090        Thread.sleep(toWait);
091      } catch (InterruptedException | ConcurrentModificationException e) {
092        continue;
093      } catch (Throwable e) {
094        LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e);
095        break;
096      }
097
098      nextLease = null;
099      nextLeaseDelay = Long.MAX_VALUE;
100      for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
101        Map.Entry<String, Lease> entry = it.next();
102        Lease lease = entry.getValue();
103        long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
104        if ( thisLeaseDelay > 0) {
105          if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
106            nextLease = lease;
107            nextLeaseDelay = thisLeaseDelay;
108          }
109        } else {
110          // A lease expired.  Run the expired code before removing from map
111          // since its presence in map is used to see if lease exists still.
112          if (lease.getListener() == null) {
113            LOG.error("lease listener is null for lease " + lease.getLeaseName());
114          } else {
115            lease.getListener().leaseExpired();
116          }
117          it.remove();
118        }
119      }
120    }
121    close();
122  }
123
124  /**
125   * Shuts down this lease instance when all outstanding leases expire.
126   * Like {@link #close()} but rather than violently end all leases, waits
127   * first on extant leases to finish.  Use this method if the lease holders
128   * could lose data, leak locks, etc.  Presumes client has shutdown
129   * allocation of new leases.
130   */
131  public void closeAfterLeasesExpire() {
132    this.stopRequested = true;
133  }
134
135  /**
136   * Shut down this Leases instance.  All pending leases will be destroyed,
137   * without any cancellation calls.
138   */
139  public void close() {
140    this.stopRequested = true;
141    leases.clear();
142    LOG.info("Closed leases");
143  }
144
145  /**
146   * Create a lease and insert it to the map of leases.
147   *
148   * @param leaseName name of the lease
149   * @param leaseTimeoutPeriod length of the lease in milliseconds
150   * @param listener listener that will process lease expirations
151   * @return The lease created.
152   */
153  public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
154      throws LeaseStillHeldException {
155    Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
156    addLease(lease);
157    return lease;
158  }
159
160  /**
161   * Inserts lease.  Resets expiration before insertion.
162   */
163  public void addLease(final Lease lease) throws LeaseStillHeldException {
164    if (this.stopRequested) {
165      return;
166    }
167    if (leases.containsKey(lease.getLeaseName())) {
168      throw new LeaseStillHeldException(lease.getLeaseName());
169    }
170    lease.resetExpirationTime();
171    leases.put(lease.getLeaseName(), lease);
172  }
173
174  /**
175   * Renew a lease
176   *
177   * @param leaseName name of the lease
178   */
179  public void renewLease(final String leaseName) throws LeaseException {
180    if (this.stopRequested) {
181      return;
182    }
183    Lease lease = leases.get(leaseName);
184
185    if (lease == null ) {
186      throw new LeaseException("lease '" + leaseName +
187          "' does not exist or has already expired");
188    }
189    lease.resetExpirationTime();
190  }
191
192  /**
193   * Client explicitly cancels a lease.
194   *
195   * @param leaseName name of lease
196   */
197  public void cancelLease(final String leaseName) throws LeaseException {
198    removeLease(leaseName);
199  }
200
201  /**
202   * Remove named lease. Lease is removed from the map of leases.
203   *
204   * @param leaseName name of lease
205   * @return Removed lease
206   */
207  Lease removeLease(final String leaseName) throws LeaseException {
208    Lease lease = leases.remove(leaseName);
209    if (lease == null) {
210      throw new LeaseException("lease '" + leaseName + "' does not exist");
211    }
212    return lease;
213  }
214
215  /**
216   * Thrown if we are asked to create a lease but lease on passed name already
217   * exists.
218   */
219  @SuppressWarnings("serial")
220  public static class LeaseStillHeldException extends IOException {
221    private final String leaseName;
222
223    public LeaseStillHeldException(final String name) {
224      this.leaseName = name;
225    }
226
227    /** @return name of lease */
228    public String getName() {
229      return this.leaseName;
230    }
231  }
232
233  /** This class tracks a single Lease. */
234  static class Lease implements Delayed {
235    private final String leaseName;
236    private final LeaseListener listener;
237    private int leaseTimeoutPeriod;
238    private long expirationTime;
239
240    Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
241      this.leaseName = leaseName;
242      this.listener = listener;
243      this.leaseTimeoutPeriod = leaseTimeoutPeriod;
244      this.expirationTime = 0;
245    }
246
247    /** @return the lease name */
248    public String getLeaseName() {
249      return leaseName;
250    }
251
252    /** @return listener */
253    public LeaseListener getListener() {
254      return this.listener;
255    }
256
257    @Override
258    public boolean equals(Object obj) {
259      if (this == obj) {
260        return true;
261      }
262      if (obj == null) {
263        return false;
264      }
265      if (getClass() != obj.getClass()) {
266        return false;
267      }
268      return this.hashCode() == obj.hashCode();
269    }
270
271    @Override
272    public int hashCode() {
273      return this.leaseName.hashCode();
274    }
275
276    @Override
277    public long getDelay(TimeUnit unit) {
278      return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
279          TimeUnit.MILLISECONDS);
280    }
281
282    @Override
283    public int compareTo(Delayed o) {
284      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
285        o.getDelay(TimeUnit.MILLISECONDS);
286
287      return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
288    }
289
290    /**
291     * Resets the expiration time of the lease.
292     */
293    public void resetExpirationTime() {
294      this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
295    }
296  }
297}