001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ConcurrentModificationException; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.Delayed; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.HasThread; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Leases 038 * 039 * There are several server classes in HBase that need to track external 040 * clients that occasionally send heartbeats. 041 * 042 * <p>These external clients hold resources in the server class. 043 * Those resources need to be released if the external client fails to send a 044 * heartbeat after some interval of time passes. 045 * 046 * <p>The Leases class is a general reusable class for this kind of pattern. 047 * An instance of the Leases class will create a thread to do its dirty work. 048 * You should close() the instance if you want to clean up the thread properly. 049 * 050 * <p> 051 * NOTE: This class extends Thread rather than Chore because the sleep time 052 * can be interrupted when there is something to do, rather than the Chore 053 * sleep time which is invariant. 054 */ 055@InterfaceAudience.Private 056public class Leases extends HasThread { 057 private static final Logger LOG = LoggerFactory.getLogger(Leases.class.getName()); 058 public static final int MIN_WAIT_TIME = 100; 059 private final Map<String, Lease> leases = new ConcurrentHashMap<>(); 060 061 protected final int leaseCheckFrequency; 062 protected volatile boolean stopRequested = false; 063 064 /** 065 * Creates a lease monitor 066 * 067 * @param leaseCheckFrequency - how often the lease should be checked 068 * (milliseconds) 069 */ 070 public Leases(final int leaseCheckFrequency) { 071 super("RegionServerLeases"); // thread name 072 this.leaseCheckFrequency = leaseCheckFrequency; 073 setDaemon(true); 074 } 075 076 /** 077 * @see Thread#run() 078 */ 079 @Override 080 public void run() { 081 long toWait = leaseCheckFrequency; 082 Lease nextLease = null; 083 long nextLeaseDelay = Long.MAX_VALUE; 084 085 while (!stopRequested || (stopRequested && !leases.isEmpty()) ) { 086 087 try { 088 if (nextLease != null) { 089 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS); 090 } 091 092 toWait = Math.min(leaseCheckFrequency, toWait); 093 toWait = Math.max(MIN_WAIT_TIME, toWait); 094 095 Thread.sleep(toWait); 096 } catch (InterruptedException e) { 097 continue; 098 } catch (ConcurrentModificationException e) { 099 continue; 100 } catch (Throwable e) { 101 LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e); 102 break; 103 } 104 105 nextLease = null; 106 nextLeaseDelay = Long.MAX_VALUE; 107 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) { 108 Map.Entry<String, Lease> entry = it.next(); 109 Lease lease = entry.getValue(); 110 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); 111 if ( thisLeaseDelay > 0) { 112 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { 113 nextLease = lease; 114 nextLeaseDelay = thisLeaseDelay; 115 } 116 } else { 117 // A lease expired. Run the expired code before removing from map 118 // since its presence in map is used to see if lease exists still. 119 if (lease.getListener() == null) { 120 LOG.error("lease listener is null for lease " + lease.getLeaseName()); 121 } else { 122 lease.getListener().leaseExpired(); 123 } 124 it.remove(); 125 } 126 } 127 } 128 close(); 129 } 130 131 /** 132 * Shuts down this lease instance when all outstanding leases expire. 133 * Like {@link #close()} but rather than violently end all leases, waits 134 * first on extant leases to finish. Use this method if the lease holders 135 * could lose data, leak locks, etc. Presumes client has shutdown 136 * allocation of new leases. 137 */ 138 public void closeAfterLeasesExpire() { 139 this.stopRequested = true; 140 } 141 142 /** 143 * Shut down this Leases instance. All pending leases will be destroyed, 144 * without any cancellation calls. 145 */ 146 public void close() { 147 this.stopRequested = true; 148 leases.clear(); 149 LOG.info("Closed leases"); 150 } 151 152 /** 153 * Create a lease and insert it to the map of leases. 154 * 155 * @param leaseName name of the lease 156 * @param leaseTimeoutPeriod length of the lease in milliseconds 157 * @param listener listener that will process lease expirations 158 * @return The lease created. 159 * @throws LeaseStillHeldException 160 */ 161 public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) 162 throws LeaseStillHeldException { 163 Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener); 164 addLease(lease); 165 return lease; 166 } 167 168 /** 169 * Inserts lease. Resets expiration before insertion. 170 * @param lease 171 * @throws LeaseStillHeldException 172 */ 173 public void addLease(final Lease lease) throws LeaseStillHeldException { 174 if (this.stopRequested) { 175 return; 176 } 177 if (leases.containsKey(lease.getLeaseName())) { 178 throw new LeaseStillHeldException(lease.getLeaseName()); 179 } 180 lease.resetExpirationTime(); 181 leases.put(lease.getLeaseName(), lease); 182 } 183 184 /** 185 * Renew a lease 186 * 187 * @param leaseName name of lease 188 * @throws LeaseException 189 */ 190 public void renewLease(final String leaseName) throws LeaseException { 191 if (this.stopRequested) { 192 return; 193 } 194 Lease lease = leases.get(leaseName); 195 196 if (lease == null ) { 197 throw new LeaseException("lease '" + leaseName + 198 "' does not exist or has already expired"); 199 } 200 lease.resetExpirationTime(); 201 } 202 203 /** 204 * Client explicitly cancels a lease. 205 * @param leaseName name of lease 206 * @throws org.apache.hadoop.hbase.regionserver.LeaseException 207 */ 208 public void cancelLease(final String leaseName) throws LeaseException { 209 removeLease(leaseName); 210 } 211 212 /** 213 * Remove named lease. 214 * Lease is removed from the map of leases. 215 * Lease can be reinserted using {@link #addLease(Lease)} 216 * 217 * @param leaseName name of lease 218 * @throws org.apache.hadoop.hbase.regionserver.LeaseException 219 * @return Removed lease 220 */ 221 Lease removeLease(final String leaseName) throws LeaseException { 222 Lease lease = leases.remove(leaseName); 223 if (lease == null) { 224 throw new LeaseException("lease '" + leaseName + "' does not exist"); 225 } 226 return lease; 227 } 228 229 /** 230 * Thrown if we are asked to create a lease but lease on passed name already 231 * exists. 232 */ 233 @SuppressWarnings("serial") 234 public static class LeaseStillHeldException extends IOException { 235 private final String leaseName; 236 237 /** 238 * @param name 239 */ 240 public LeaseStillHeldException(final String name) { 241 this.leaseName = name; 242 } 243 244 /** @return name of lease */ 245 public String getName() { 246 return this.leaseName; 247 } 248 } 249 250 /** This class tracks a single Lease. */ 251 static class Lease implements Delayed { 252 private final String leaseName; 253 private final LeaseListener listener; 254 private int leaseTimeoutPeriod; 255 private long expirationTime; 256 257 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) { 258 this.leaseName = leaseName; 259 this.listener = listener; 260 this.leaseTimeoutPeriod = leaseTimeoutPeriod; 261 this.expirationTime = 0; 262 } 263 264 /** @return the lease name */ 265 public String getLeaseName() { 266 return leaseName; 267 } 268 269 /** @return listener */ 270 public LeaseListener getListener() { 271 return this.listener; 272 } 273 274 @Override 275 public boolean equals(Object obj) { 276 if (this == obj) { 277 return true; 278 } 279 if (obj == null) { 280 return false; 281 } 282 if (getClass() != obj.getClass()) { 283 return false; 284 } 285 return this.hashCode() == obj.hashCode(); 286 } 287 288 @Override 289 public int hashCode() { 290 return this.leaseName.hashCode(); 291 } 292 293 @Override 294 public long getDelay(TimeUnit unit) { 295 return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(), 296 TimeUnit.MILLISECONDS); 297 } 298 299 @Override 300 public int compareTo(Delayed o) { 301 long delta = this.getDelay(TimeUnit.MILLISECONDS) - 302 o.getDelay(TimeUnit.MILLISECONDS); 303 304 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); 305 } 306 307 /** 308 * Resets the expiration time of the lease. 309 */ 310 public void resetExpirationTime() { 311 this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod; 312 } 313 } 314}