001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.locking; 019 020import java.io.IOException; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.atomic.AtomicBoolean; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.hadoop.hbase.util.Threads; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 036 037/** 038 * Lock for HBase Entity either a Table, a Namespace, or Regions. These are remote locks which live 039 * on master, and need periodic heartbeats to keep them alive. (Once we request the lock, internally 040 * an heartbeat thread will be started on the client). If master does not receive the heartbeat in 041 * time, it'll release the lock and make it available to other users. 042 * <p> 043 * Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}. 044 * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread which 045 * will check lock's status periodically and once the lock is acquired, it will send the heartbeats 046 * to the master. 047 * <p> 048 * Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired. Always 049 * call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock was 050 * acquired, it'll be released. If it was not acquired, it is possible that master grants the lock 051 * in future and the heartbeat thread keeps it alive forever by sending heartbeats. Calling 052 * {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master. 053 * <p> 054 * There are 4 ways in which these remote locks may be released/can be lost: 055 * <ul> 056 * <li>Call {@link #unlock}.</li> 057 * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc. Worker thread 058 * will call the given abortable as soon as it detects such a situation.</li> 059 * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send 060 * heartbeat before the timeout expires, it assumes that lock is lost and calls the abortable.</li> 061 * <li>Worker thread is interrupted.</li> 062 * </ul> 063 * Use example: <code> 064 * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable); 065 * lock.requestLock(); 066 * .... 067 * ....can do other initializations here since lock is 'asynchronous'... 068 * .... 069 * if (lock.await(timeout)) { 070 * ....logic requiring mutual exclusion 071 * } 072 * lock.unlock(); 073 * </code> 074 */ 075@InterfaceAudience.Public 076public class EntityLock { 077 private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class); 078 079 public static final String HEARTBEAT_TIME_BUFFER = "hbase.client.locks.heartbeat.time.buffer.ms"; 080 081 private final AtomicBoolean locked = new AtomicBoolean(false); 082 private final CountDownLatch latch = new CountDownLatch(1); 083 084 private final LockService.BlockingInterface stub; 085 private final LockHeartbeatWorker worker; 086 private final LockRequest lockRequest; 087 private final Abortable abort; 088 089 // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc. 090 private final int heartbeatTimeBuffer; 091 092 // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait 093 // for long time periods between heartbeats. 094 private long testingSleepTime = 0; 095 096 private Long procId = null; 097 098 /** 099 * Abortable.abort() is called when the lease of the lock will expire. It's up to the user decide 100 * if simply abort the process or handle the loss of the lock by aborting the operation that was 101 * supposed to be under lock. 102 */ 103 EntityLock(Configuration conf, LockService.BlockingInterface stub, LockRequest request, 104 Abortable abort) { 105 this.stub = stub; 106 this.lockRequest = request; 107 this.abort = abort; 108 109 this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000); 110 this.worker = new LockHeartbeatWorker(lockRequest.getDescription()); 111 } 112 113 @Override 114 public String toString() { 115 final StringBuilder sb = new StringBuilder(); 116 sb.append("EntityLock locked="); 117 sb.append(locked.get()); 118 sb.append(", procId="); 119 sb.append(procId); 120 sb.append(", type="); 121 sb.append(lockRequest.getLockType()); 122 if (lockRequest.getRegionInfoCount() > 0) { 123 sb.append(", regions="); 124 for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) { 125 if (i > 0) sb.append(", "); 126 sb.append(lockRequest.getRegionInfo(i)); 127 } 128 } else if (lockRequest.hasTableName()) { 129 sb.append(", table="); 130 sb.append(lockRequest.getTableName()); 131 } else if (lockRequest.hasNamespace()) { 132 sb.append(", namespace="); 133 sb.append(lockRequest.getNamespace()); 134 } 135 sb.append(", description="); 136 sb.append(lockRequest.getDescription()); 137 return sb.toString(); 138 } 139 140 @InterfaceAudience.Private 141 void setTestingSleepTime(long timeInMillis) { 142 testingSleepTime = timeInMillis; 143 } 144 145 @InterfaceAudience.Private 146 LockHeartbeatWorker getWorker() { 147 return worker; 148 } 149 150 public boolean isLocked() { 151 return locked.get(); 152 } 153 154 /** 155 * Sends rpc to the master to request lock. The lock request is queued with other lock requests. 156 * Call {@link #await()} to wait on lock. Always call {@link #unlock()} after calling the below, 157 * even after error. 158 */ 159 public void requestLock() throws IOException { 160 if (procId == null) { 161 try { 162 procId = stub.requestLock(null, lockRequest).getProcId(); 163 } catch (Exception e) { 164 throw ProtobufUtil.handleRemoteException(e); 165 } 166 worker.start(); 167 } else { 168 LOG.info("Lock already queued : " + toString()); 169 } 170 } 171 172 /** 173 * @param timeout in milliseconds. If set to 0, waits indefinitely. 174 * @return true if lock was acquired; and false if waiting time elapsed before lock could be 175 * acquired. 176 */ 177 public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { 178 final boolean result = latch.await(timeout, timeUnit); 179 String lockRequestStr = lockRequest.toString().replace("\n", ", "); 180 if (result) { 181 LOG.info("Acquired " + lockRequestStr); 182 } else { 183 LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(), 184 lockRequestStr)); 185 } 186 return result; 187 } 188 189 public void await() throws InterruptedException { 190 latch.await(); 191 } 192 193 public void unlock() throws IOException { 194 Threads.shutdown(worker.shutdown()); 195 try { 196 stub.lockHeartbeat(null, 197 LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build()); 198 } catch (Exception e) { 199 throw ProtobufUtil.handleRemoteException(e); 200 } 201 } 202 203 protected class LockHeartbeatWorker extends Thread { 204 private volatile boolean shutdown = false; 205 206 public LockHeartbeatWorker(final String desc) { 207 super("LockHeartbeatWorker(" + desc + ")"); 208 setDaemon(true); 209 } 210 211 /** Returns Shuts down the thread clean and quietly. */ 212 Thread shutdown() { 213 shutdown = true; 214 interrupt(); 215 return this; 216 } 217 218 @Override 219 public void run() { 220 final LockHeartbeatRequest lockHeartbeatRequest = 221 LockHeartbeatRequest.newBuilder().setProcId(procId).build(); 222 223 LockHeartbeatResponse response; 224 while (true) { 225 try { 226 response = stub.lockHeartbeat(null, lockHeartbeatRequest); 227 } catch (Exception e) { 228 e = ProtobufUtil.handleRemoteException(e); 229 locked.set(false); 230 LOG.error("Heartbeat failed, releasing " + EntityLock.this, e); 231 abort.abort("Heartbeat failed", e); 232 return; 233 } 234 if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) { 235 locked.set(true); 236 latch.countDown(); 237 } else 238 if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) { 239 // Lock timed out. 240 locked.set(false); 241 abort.abort("Lock timed out.", null); 242 return; 243 } 244 245 try { 246 // If lock not acquired yet, poll faster so we can notify faster. 247 long sleepTime = 1000; 248 if (isLocked()) { 249 // If lock acquired, then use lock timeout to determine heartbeat rate. 250 // If timeout is <heartbeatTimeBuffer, send back to back heartbeats. 251 sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1); 252 } 253 if (testingSleepTime != 0) { 254 sleepTime = testingSleepTime; 255 } 256 Thread.sleep(sleepTime); 257 } catch (InterruptedException e) { 258 // Since there won't be any more heartbeats, assume lock will be lost. 259 locked.set(false); 260 if (!this.shutdown) { 261 LOG.error("Interrupted, releasing " + this, e); 262 abort.abort("Worker thread interrupted", e); 263 } 264 return; 265 } 266 } 267 } 268 } 269}