001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client.locking; 020 021import java.io.IOException; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 036import org.apache.hadoop.hbase.util.Threads; 037 038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 039 040/** 041 * Lock for HBase Entity either a Table, a Namespace, or Regions. 042 * 043 * These are remote locks which live on master, and need periodic heartbeats to keep them alive. 044 * (Once we request the lock, internally an heartbeat thread will be started on the client). 045 * If master does not receive the heartbeat in time, it'll release the lock and make it available 046 * to other users. 047 * 048 * <p>Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}. 049 * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread 050 * which will check lock's status periodically and once the lock is acquired, it will send the 051 * heartbeats to the master. 052 * 053 * <p>Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired. 054 * Always call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock 055 * was acquired, it'll be released. If it was not acquired, it is possible that master grants the 056 * lock in future and the heartbeat thread keeps it alive forever by sending heartbeats. 057 * Calling {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master. 058 * 059 * <p>There are 4 ways in which these remote locks may be released/can be lost: 060 * <ul><li>Call {@link #unlock}.</li> 061 * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc. 062 * Worker thread will call the given abortable as soon as it detects such a situation.</li> 063 * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send 064 * heartbeat before the timeout expires, it assumes that lock is lost and calls the 065 * abortable.</li> 066 * <li>Worker thread is interrupted.</li> 067 * </ul> 068 * 069 * Use example: 070 * <code> 071 * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable); 072 * lock.requestLock(); 073 * .... 074 * ....can do other initializations here since lock is 'asynchronous'... 075 * .... 076 * if (lock.await(timeout)) { 077 * ....logic requiring mutual exclusion 078 * } 079 * lock.unlock(); 080 * </code> 081 */ 082@InterfaceAudience.Public 083public class EntityLock { 084 private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class); 085 086 public static final String HEARTBEAT_TIME_BUFFER = 087 "hbase.client.locks.heartbeat.time.buffer.ms"; 088 089 private final AtomicBoolean locked = new AtomicBoolean(false); 090 private final CountDownLatch latch = new CountDownLatch(1); 091 092 private final LockService.BlockingInterface stub; 093 private final LockHeartbeatWorker worker; 094 private final LockRequest lockRequest; 095 private final Abortable abort; 096 097 // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc. 098 private final int heartbeatTimeBuffer; 099 100 // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait 101 // for long time periods between heartbeats. 102 private long testingSleepTime = 0; 103 104 private Long procId = null; 105 106 /** 107 * Abortable.abort() is called when the lease of the lock will expire. 108 * It's up to the user decide if simply abort the process or handle the loss of the lock 109 * by aborting the operation that was supposed to be under lock. 110 */ 111 EntityLock(Configuration conf, LockService.BlockingInterface stub, 112 LockRequest request, Abortable abort) { 113 this.stub = stub; 114 this.lockRequest = request; 115 this.abort = abort; 116 117 this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000); 118 this.worker = new LockHeartbeatWorker(lockRequest.getDescription()); 119 } 120 121 @Override 122 public String toString() { 123 final StringBuilder sb = new StringBuilder(); 124 sb.append("EntityLock locked="); 125 sb.append(locked.get()); 126 sb.append(", procId="); 127 sb.append(procId); 128 sb.append(", type="); 129 sb.append(lockRequest.getLockType()); 130 if (lockRequest.getRegionInfoCount() > 0) { 131 sb.append(", regions="); 132 for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) { 133 if (i > 0) sb.append(", "); 134 sb.append(lockRequest.getRegionInfo(i)); 135 } 136 } else if (lockRequest.hasTableName()) { 137 sb.append(", table="); 138 sb.append(lockRequest.getTableName()); 139 } else if (lockRequest.hasNamespace()) { 140 sb.append(", namespace="); 141 sb.append(lockRequest.getNamespace()); 142 } 143 sb.append(", description="); 144 sb.append(lockRequest.getDescription()); 145 return sb.toString(); 146 } 147 148 @VisibleForTesting 149 void setTestingSleepTime(long timeInMillis) { 150 testingSleepTime = timeInMillis; 151 } 152 153 @VisibleForTesting 154 LockHeartbeatWorker getWorker() { 155 return worker; 156 } 157 158 public boolean isLocked() { 159 return locked.get(); 160 } 161 162 /** 163 * Sends rpc to the master to request lock. 164 * The lock request is queued with other lock requests. 165 * Call {@link #await()} to wait on lock. 166 * Always call {@link #unlock()} after calling the below, even after error. 167 */ 168 public void requestLock() throws IOException { 169 if (procId == null) { 170 try { 171 procId = stub.requestLock(null, lockRequest).getProcId(); 172 } catch (Exception e) { 173 throw ProtobufUtil.handleRemoteException(e); 174 } 175 worker.start(); 176 } else { 177 LOG.info("Lock already queued : " + toString()); 178 } 179 } 180 181 /** 182 * @param timeout in milliseconds. If set to 0, waits indefinitely. 183 * @return true if lock was acquired; and false if waiting time elapsed before lock could be 184 * acquired. 185 */ 186 public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { 187 final boolean result = latch.await(timeout, timeUnit); 188 String lockRequestStr = lockRequest.toString().replace("\n", ", "); 189 if (result) { 190 LOG.info("Acquired " + lockRequestStr); 191 } else { 192 LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(), 193 lockRequestStr)); 194 } 195 return result; 196 } 197 198 public void await() throws InterruptedException { 199 latch.await(); 200 } 201 202 public void unlock() throws IOException { 203 Threads.shutdown(worker.shutdown()); 204 try { 205 stub.lockHeartbeat(null, 206 LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build()); 207 } catch (Exception e) { 208 throw ProtobufUtil.handleRemoteException(e); 209 } 210 } 211 212 protected class LockHeartbeatWorker extends Thread { 213 private volatile boolean shutdown = false; 214 215 public LockHeartbeatWorker(final String desc) { 216 super("LockHeartbeatWorker(" + desc + ")"); 217 setDaemon(true); 218 } 219 220 /** 221 * @return Shuts down the thread clean and quietly. 222 */ 223 Thread shutdown() { 224 shutdown = true; 225 interrupt(); 226 return this; 227 } 228 229 @Override 230 public void run() { 231 final LockHeartbeatRequest lockHeartbeatRequest = 232 LockHeartbeatRequest.newBuilder().setProcId(procId).build(); 233 234 LockHeartbeatResponse response; 235 while (true) { 236 try { 237 response = stub.lockHeartbeat(null, lockHeartbeatRequest); 238 } catch (Exception e) { 239 e = ProtobufUtil.handleRemoteException(e); 240 locked.set(false); 241 LOG.error("Heartbeat failed, releasing " + EntityLock.this, e); 242 abort.abort("Heartbeat failed", e); 243 return; 244 } 245 if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) { 246 locked.set(true); 247 latch.countDown(); 248 } else if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) { 249 // Lock timed out. 250 locked.set(false); 251 abort.abort("Lock timed out.", null); 252 return; 253 } 254 255 try { 256 // If lock not acquired yet, poll faster so we can notify faster. 257 long sleepTime = 1000; 258 if (isLocked()) { 259 // If lock acquired, then use lock timeout to determine heartbeat rate. 260 // If timeout is <heartbeatTimeBuffer, send back to back heartbeats. 261 sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1); 262 } 263 if (testingSleepTime != 0) { 264 sleepTime = testingSleepTime; 265 } 266 Thread.sleep(sleepTime); 267 } catch (InterruptedException e) { 268 // Since there won't be any more heartbeats, assume lock will be lost. 269 locked.set(false); 270 if (!this.shutdown) { 271 LOG.error("Interrupted, releasing " + this, e); 272 abort.abort("Worker thread interrupted", e); 273 } 274 return; 275 } 276 } 277 } 278 } 279}