001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.locking;
019
020import java.io.IOException;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.atomic.AtomicBoolean;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Abortable;
026import org.apache.hadoop.hbase.util.Threads;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
036
037/**
038 * Lock for HBase Entity either a Table, a Namespace, or Regions. These are remote locks which live
039 * on master, and need periodic heartbeats to keep them alive. (Once we request the lock, internally
040 * an heartbeat thread will be started on the client). If master does not receive the heartbeat in
041 * time, it'll release the lock and make it available to other users.
042 * <p>
043 * Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}.
044 * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread which
045 * will check lock's status periodically and once the lock is acquired, it will send the heartbeats
046 * to the master.
047 * <p>
048 * Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired. Always
049 * call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock was
050 * acquired, it'll be released. If it was not acquired, it is possible that master grants the lock
051 * in future and the heartbeat thread keeps it alive forever by sending heartbeats. Calling
052 * {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master.
053 * <p>
054 * There are 4 ways in which these remote locks may be released/can be lost:
055 * <ul>
056 * <li>Call {@link #unlock}.</li>
057 * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc. Worker thread
058 * will call the given abortable as soon as it detects such a situation.</li>
059 * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send
060 * heartbeat before the timeout expires, it assumes that lock is lost and calls the abortable.</li>
061 * <li>Worker thread is interrupted.</li>
062 * </ul>
063 * Use example: <code>
064 * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable);
065 * lock.requestLock();
066 * ....
067 * ....can do other initializations here since lock is 'asynchronous'...
068 * ....
069 * if (lock.await(timeout)) {
070 *   ....logic requiring mutual exclusion
071 * }
072 * lock.unlock();
073 * </code>
074 */
075@InterfaceAudience.Public
076public class EntityLock {
077  private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class);
078
079  public static final String HEARTBEAT_TIME_BUFFER = "hbase.client.locks.heartbeat.time.buffer.ms";
080
081  private final AtomicBoolean locked = new AtomicBoolean(false);
082  private final CountDownLatch latch = new CountDownLatch(1);
083
084  private final LockService.BlockingInterface stub;
085  private final LockHeartbeatWorker worker;
086  private final LockRequest lockRequest;
087  private final Abortable abort;
088
089  // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc.
090  private final int heartbeatTimeBuffer;
091
092  // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait
093  // for long time periods between heartbeats.
094  private long testingSleepTime = 0;
095
096  private Long procId = null;
097
098  /**
099   * Abortable.abort() is called when the lease of the lock will expire. It's up to the user decide
100   * if simply abort the process or handle the loss of the lock by aborting the operation that was
101   * supposed to be under lock.
102   */
103  EntityLock(Configuration conf, LockService.BlockingInterface stub, LockRequest request,
104    Abortable abort) {
105    this.stub = stub;
106    this.lockRequest = request;
107    this.abort = abort;
108
109    this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
110    this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
111  }
112
113  @Override
114  public String toString() {
115    final StringBuilder sb = new StringBuilder();
116    sb.append("EntityLock locked=");
117    sb.append(locked.get());
118    sb.append(", procId=");
119    sb.append(procId);
120    sb.append(", type=");
121    sb.append(lockRequest.getLockType());
122    if (lockRequest.getRegionInfoCount() > 0) {
123      sb.append(", regions=");
124      for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) {
125        if (i > 0) sb.append(", ");
126        sb.append(lockRequest.getRegionInfo(i));
127      }
128    } else if (lockRequest.hasTableName()) {
129      sb.append(", table=");
130      sb.append(lockRequest.getTableName());
131    } else if (lockRequest.hasNamespace()) {
132      sb.append(", namespace=");
133      sb.append(lockRequest.getNamespace());
134    }
135    sb.append(", description=");
136    sb.append(lockRequest.getDescription());
137    return sb.toString();
138  }
139
140  @InterfaceAudience.Private
141  void setTestingSleepTime(long timeInMillis) {
142    testingSleepTime = timeInMillis;
143  }
144
145  @InterfaceAudience.Private
146  LockHeartbeatWorker getWorker() {
147    return worker;
148  }
149
150  public boolean isLocked() {
151    return locked.get();
152  }
153
154  /**
155   * Sends rpc to the master to request lock. The lock request is queued with other lock requests.
156   * Call {@link #await()} to wait on lock. Always call {@link #unlock()} after calling the below,
157   * even after error.
158   */
159  public void requestLock() throws IOException {
160    if (procId == null) {
161      try {
162        procId = stub.requestLock(null, lockRequest).getProcId();
163      } catch (Exception e) {
164        throw ProtobufUtil.handleRemoteException(e);
165      }
166      worker.start();
167    } else {
168      LOG.info("Lock already queued : " + toString());
169    }
170  }
171
172  /**
173   * @param timeout in milliseconds. If set to 0, waits indefinitely.
174   * @return true if lock was acquired; and false if waiting time elapsed before lock could be
175   *         acquired.
176   */
177  public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
178    final boolean result = latch.await(timeout, timeUnit);
179    String lockRequestStr = lockRequest.toString().replace("\n", ", ");
180    if (result) {
181      LOG.info("Acquired " + lockRequestStr);
182    } else {
183      LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(),
184        lockRequestStr));
185    }
186    return result;
187  }
188
189  public void await() throws InterruptedException {
190    latch.await();
191  }
192
193  public void unlock() throws IOException {
194    Threads.shutdown(worker.shutdown());
195    try {
196      stub.lockHeartbeat(null,
197        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
198    } catch (Exception e) {
199      throw ProtobufUtil.handleRemoteException(e);
200    }
201  }
202
203  protected class LockHeartbeatWorker extends Thread {
204    private volatile boolean shutdown = false;
205
206    public LockHeartbeatWorker(final String desc) {
207      super("LockHeartbeatWorker(" + desc + ")");
208      setDaemon(true);
209    }
210
211    /** Returns Shuts down the thread clean and quietly. */
212    Thread shutdown() {
213      shutdown = true;
214      interrupt();
215      return this;
216    }
217
218    @Override
219    public void run() {
220      final LockHeartbeatRequest lockHeartbeatRequest =
221        LockHeartbeatRequest.newBuilder().setProcId(procId).build();
222
223      LockHeartbeatResponse response;
224      while (true) {
225        try {
226          response = stub.lockHeartbeat(null, lockHeartbeatRequest);
227        } catch (Exception e) {
228          e = ProtobufUtil.handleRemoteException(e);
229          locked.set(false);
230          LOG.error("Heartbeat failed, releasing " + EntityLock.this, e);
231          abort.abort("Heartbeat failed", e);
232          return;
233        }
234        if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
235          locked.set(true);
236          latch.countDown();
237        } else
238          if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) {
239            // Lock timed out.
240            locked.set(false);
241            abort.abort("Lock timed out.", null);
242            return;
243          }
244
245        try {
246          // If lock not acquired yet, poll faster so we can notify faster.
247          long sleepTime = 1000;
248          if (isLocked()) {
249            // If lock acquired, then use lock timeout to determine heartbeat rate.
250            // If timeout is <heartbeatTimeBuffer, send back to back heartbeats.
251            sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1);
252          }
253          if (testingSleepTime != 0) {
254            sleepTime = testingSleepTime;
255          }
256          Thread.sleep(sleepTime);
257        } catch (InterruptedException e) {
258          // Since there won't be any more heartbeats, assume lock will be lost.
259          locked.set(false);
260          if (!this.shutdown) {
261            LOG.error("Interrupted, releasing " + this, e);
262            abort.abort("Worker thread interrupted", e);
263          }
264          return;
265        }
266      }
267    }
268  }
269}