001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ConcurrentModificationException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.Delayed;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.log.HBaseMarkers;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Leases There are several server classes in HBase that need to track external clients that
035 * occasionally send heartbeats.
036 * <p>
037 * These external clients hold resources in the server class. Those resources need to be released if
038 * the external client fails to send a heartbeat after some interval of time passes.
039 * <p>
040 * The Leases class is a general reusable class for this kind of pattern. An instance of the Leases
041 * class will create a thread to do its dirty work. You should close() the instance if you want to
042 * clean up the thread properly.
043 * <p>
044 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
045 * there is something to do, rather than the Chore sleep time which is invariant.
046 */
047@InterfaceAudience.Private
048public class LeaseManager extends Thread {
049  private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName());
050  private static final int MIN_WAIT_TIME = 100;
051
052  private final Map<String, Lease> leases = new ConcurrentHashMap<>();
053  private final int leaseCheckFrequency;
054  private volatile boolean stopRequested = false;
055
056  /**
057   * Creates a lease manager.
058   * @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
059   */
060  public LeaseManager(final int leaseCheckFrequency) {
061    super("RegionServer.LeaseManager"); // thread name
062    this.leaseCheckFrequency = leaseCheckFrequency;
063    setDaemon(true);
064  }
065
066  @Override
067  public void run() {
068    long toWait = leaseCheckFrequency;
069    Lease nextLease = null;
070    long nextLeaseDelay = Long.MAX_VALUE;
071
072    while (!stopRequested || (stopRequested && !leases.isEmpty())) {
073
074      try {
075        if (nextLease != null) {
076          toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
077        }
078
079        toWait = Math.min(leaseCheckFrequency, toWait);
080        toWait = Math.max(MIN_WAIT_TIME, toWait);
081
082        Thread.sleep(toWait);
083      } catch (InterruptedException | ConcurrentModificationException e) {
084        continue;
085      } catch (Throwable e) {
086        LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e);
087        break;
088      }
089
090      nextLease = null;
091      nextLeaseDelay = Long.MAX_VALUE;
092      for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
093        Map.Entry<String, Lease> entry = it.next();
094        Lease lease = entry.getValue();
095        long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
096        if (thisLeaseDelay > 0) {
097          if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
098            nextLease = lease;
099            nextLeaseDelay = thisLeaseDelay;
100          }
101        } else {
102          // A lease expired. Run the expired code before removing from map
103          // since its presence in map is used to see if lease exists still.
104          if (lease.getListener() == null) {
105            LOG.error("lease listener is null for lease " + lease.getLeaseName());
106          } else {
107            lease.getListener().leaseExpired();
108          }
109          it.remove();
110        }
111      }
112    }
113    close();
114  }
115
116  /**
117   * Shuts down this lease instance when all outstanding leases expire. Like {@link #close()} but
118   * rather than violently end all leases, waits first on extant leases to finish. Use this method
119   * if the lease holders could lose data, leak locks, etc. Presumes client has shutdown allocation
120   * of new leases.
121   */
122  public void closeAfterLeasesExpire() {
123    this.stopRequested = true;
124  }
125
126  /**
127   * Shut down this Leases instance. All pending leases will be destroyed, without any cancellation
128   * calls.
129   */
130  public void close() {
131    this.stopRequested = true;
132    leases.clear();
133    LOG.info("Closed leases");
134  }
135
136  /**
137   * Create a lease and insert it to the map of leases.
138   * @param leaseName          name of the lease
139   * @param leaseTimeoutPeriod length of the lease in milliseconds
140   * @param listener           listener that will process lease expirations
141   * @return The lease created.
142   */
143  public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
144    throws LeaseStillHeldException {
145    Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
146    addLease(lease);
147    return lease;
148  }
149
150  /**
151   * Inserts lease. Resets expiration before insertion.
152   */
153  public void addLease(final Lease lease) throws LeaseStillHeldException {
154    if (this.stopRequested) {
155      return;
156    }
157    if (leases.containsKey(lease.getLeaseName())) {
158      throw new LeaseStillHeldException(lease.getLeaseName());
159    }
160    lease.resetExpirationTime();
161    leases.put(lease.getLeaseName(), lease);
162  }
163
164  /**
165   * Renew a lease
166   * @param leaseName name of the lease
167   */
168  public void renewLease(final String leaseName) throws LeaseException {
169    if (this.stopRequested) {
170      return;
171    }
172    Lease lease = leases.get(leaseName);
173
174    if (lease == null) {
175      throw new LeaseException("lease '" + leaseName + "' does not exist or has already expired");
176    }
177    lease.resetExpirationTime();
178  }
179
180  /**
181   * Client explicitly cancels a lease.
182   * @param leaseName name of lease
183   */
184  public void cancelLease(final String leaseName) throws LeaseException {
185    removeLease(leaseName);
186  }
187
188  /**
189   * Remove named lease. Lease is removed from the map of leases.
190   * @param leaseName name of lease
191   * @return Removed lease
192   */
193  Lease removeLease(final String leaseName) throws LeaseException {
194    Lease lease = leases.remove(leaseName);
195    if (lease == null) {
196      throw new LeaseException("lease '" + leaseName + "' does not exist");
197    }
198    return lease;
199  }
200
201  /**
202   * Thrown if we are asked to create a lease but lease on passed name already exists.
203   */
204  @SuppressWarnings("serial")
205  public static class LeaseStillHeldException extends IOException {
206    private final String leaseName;
207
208    public LeaseStillHeldException(final String name) {
209      this.leaseName = name;
210    }
211
212    /** @return name of lease */
213    public String getName() {
214      return this.leaseName;
215    }
216  }
217
218  /** This class tracks a single Lease. */
219  static class Lease implements Delayed {
220    private final String leaseName;
221    private final LeaseListener listener;
222    private int leaseTimeoutPeriod;
223    private long expirationTime;
224
225    Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
226      this.leaseName = leaseName;
227      this.listener = listener;
228      this.leaseTimeoutPeriod = leaseTimeoutPeriod;
229      this.expirationTime = 0;
230    }
231
232    /** @return the lease name */
233    public String getLeaseName() {
234      return leaseName;
235    }
236
237    /** @return listener */
238    public LeaseListener getListener() {
239      return this.listener;
240    }
241
242    @Override
243    public boolean equals(Object obj) {
244      if (this == obj) {
245        return true;
246      }
247      if (obj == null) {
248        return false;
249      }
250      if (getClass() != obj.getClass()) {
251        return false;
252      }
253      return this.hashCode() == obj.hashCode();
254    }
255
256    @Override
257    public int hashCode() {
258      return this.leaseName.hashCode();
259    }
260
261    @Override
262    public long getDelay(TimeUnit unit) {
263      return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
264        TimeUnit.MILLISECONDS);
265    }
266
267    @Override
268    public int compareTo(Delayed o) {
269      long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
270
271      return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
272    }
273
274    /**
275     * Resets the expiration time of the lease.
276     */
277    public void resetExpirationTime() {
278      this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
279    }
280  }
281}