001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client.locking;
020
021import java.io.IOException;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
036import org.apache.hadoop.hbase.util.Threads;
037
038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Lock for HBase Entity either a Table, a Namespace, or Regions.
042 *
043 * These are remote locks which live on master, and need periodic heartbeats to keep them alive.
044 * (Once we request the lock, internally an heartbeat thread will be started on the client).
045 * If master does not receive the heartbeat in time, it'll release the lock and make it available
046 * to other users.
047 *
048 * <p>Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}.
049 * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread
050 * which will check lock's status periodically and once the lock is acquired, it will send the
051 * heartbeats to the master.
052 *
053 * <p>Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired.
054 * Always call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock
055 * was acquired, it'll be released. If it was not acquired, it is possible that master grants the
056 * lock in future and the heartbeat thread keeps it alive forever by sending heartbeats.
057 * Calling {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master.
058 *
059 * <p>There are 4 ways in which these remote locks may be released/can be lost:
060 * <ul><li>Call {@link #unlock}.</li>
061 * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc.
062 *     Worker thread will call the given abortable as soon as it detects such a situation.</li>
063 * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send
064 *     heartbeat before the timeout expires, it assumes that lock is lost and calls the
065 *     abortable.</li>
066 * <li>Worker thread is interrupted.</li>
067 * </ul>
068 *
069 * Use example:
070 * <code>
071 * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable);
072 * lock.requestLock();
073 * ....
074 * ....can do other initializations here since lock is 'asynchronous'...
075 * ....
076 * if (lock.await(timeout)) {
077 *   ....logic requiring mutual exclusion
078 * }
079 * lock.unlock();
080 * </code>
081 */
082@InterfaceAudience.Public
083public class EntityLock {
084  private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class);
085
086  public static final String HEARTBEAT_TIME_BUFFER =
087      "hbase.client.locks.heartbeat.time.buffer.ms";
088
089  private final AtomicBoolean locked = new AtomicBoolean(false);
090  private final CountDownLatch latch = new CountDownLatch(1);
091
092  private final LockService.BlockingInterface stub;
093  private final LockHeartbeatWorker worker;
094  private final LockRequest lockRequest;
095  private final Abortable abort;
096
097  // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc.
098  private final int heartbeatTimeBuffer;
099
100  // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait
101  // for long time periods between heartbeats.
102  private long testingSleepTime = 0;
103
104  private Long procId = null;
105
106  /**
107   * Abortable.abort() is called when the lease of the lock will expire.
108   * It's up to the user decide if simply abort the process or handle the loss of the lock
109   * by aborting the operation that was supposed to be under lock.
110   */
111  EntityLock(Configuration conf, LockService.BlockingInterface stub,
112      LockRequest request, Abortable abort) {
113    this.stub = stub;
114    this.lockRequest = request;
115    this.abort = abort;
116
117    this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
118    this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
119  }
120
121  @Override
122  public String toString() {
123    final StringBuilder sb = new StringBuilder();
124    sb.append("EntityLock locked=");
125    sb.append(locked.get());
126    sb.append(", procId=");
127    sb.append(procId);
128    sb.append(", type=");
129    sb.append(lockRequest.getLockType());
130    if (lockRequest.getRegionInfoCount() > 0) {
131      sb.append(", regions=");
132      for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) {
133        if (i > 0) sb.append(", ");
134        sb.append(lockRequest.getRegionInfo(i));
135      }
136    } else if (lockRequest.hasTableName()) {
137      sb.append(", table=");
138      sb.append(lockRequest.getTableName());
139    } else if (lockRequest.hasNamespace()) {
140      sb.append(", namespace=");
141      sb.append(lockRequest.getNamespace());
142    }
143    sb.append(", description=");
144    sb.append(lockRequest.getDescription());
145    return sb.toString();
146  }
147
148  @VisibleForTesting
149  void setTestingSleepTime(long timeInMillis) {
150    testingSleepTime = timeInMillis;
151  }
152
153  @VisibleForTesting
154  LockHeartbeatWorker getWorker() {
155    return worker;
156  }
157
158  public boolean isLocked() {
159    return locked.get();
160  }
161
162  /**
163   * Sends rpc to the master to request lock.
164   * The lock request is queued with other lock requests.
165   * Call {@link #await()} to wait on lock.
166   * Always call {@link #unlock()} after calling the below, even after error.
167   */
168  public void requestLock() throws IOException {
169    if (procId == null) {
170      try {
171        procId = stub.requestLock(null, lockRequest).getProcId();
172      } catch (Exception e) {
173        throw ProtobufUtil.handleRemoteException(e);
174      }
175      worker.start();
176    } else {
177      LOG.info("Lock already queued : " + toString());
178    }
179  }
180
181  /**
182   * @param timeout in milliseconds. If set to 0, waits indefinitely.
183   * @return true if lock was acquired; and false if waiting time elapsed before lock could be
184   * acquired.
185   */
186  public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
187    final boolean result = latch.await(timeout, timeUnit);
188    String lockRequestStr = lockRequest.toString().replace("\n", ", ");
189    if (result) {
190      LOG.info("Acquired " + lockRequestStr);
191    } else {
192      LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(),
193          lockRequestStr));
194    }
195    return result;
196  }
197
198  public void await() throws InterruptedException {
199    latch.await();
200  }
201
202  public void unlock() throws IOException {
203    Threads.shutdown(worker.shutdown());
204    try {
205      stub.lockHeartbeat(null,
206        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
207    } catch (Exception e) {
208      throw ProtobufUtil.handleRemoteException(e);
209    }
210  }
211
212  protected class LockHeartbeatWorker extends Thread {
213    private volatile boolean shutdown = false;
214
215    public LockHeartbeatWorker(final String desc) {
216      super("LockHeartbeatWorker(" + desc + ")");
217      setDaemon(true);
218    }
219
220    /**
221     * @return Shuts down the thread clean and quietly.
222     */
223    Thread shutdown() {
224      shutdown = true;
225      interrupt();
226      return this;
227    }
228
229    @Override
230    public void run() {
231      final LockHeartbeatRequest lockHeartbeatRequest =
232          LockHeartbeatRequest.newBuilder().setProcId(procId).build();
233
234      LockHeartbeatResponse response;
235      while (true) {
236        try {
237          response = stub.lockHeartbeat(null, lockHeartbeatRequest);
238        } catch (Exception e) {
239          e = ProtobufUtil.handleRemoteException(e);
240          locked.set(false);
241          LOG.error("Heartbeat failed, releasing " + EntityLock.this, e);
242          abort.abort("Heartbeat failed", e);
243          return;
244        }
245        if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
246          locked.set(true);
247          latch.countDown();
248        } else if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) {
249          // Lock timed out.
250          locked.set(false);
251          abort.abort("Lock timed out.", null);
252          return;
253        }
254
255        try {
256          // If lock not acquired yet, poll faster so we can notify faster.
257          long sleepTime = 1000;
258          if (isLocked()) {
259            // If lock acquired, then use lock timeout to determine heartbeat rate.
260            // If timeout is <heartbeatTimeBuffer, send back to back heartbeats.
261            sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1);
262          }
263          if (testingSleepTime != 0) {
264            sleepTime = testingSleepTime;
265          }
266          Thread.sleep(sleepTime);
267        } catch (InterruptedException e) {
268          // Since there won't be any more heartbeats, assume lock will be lost.
269          locked.set(false);
270          if (!this.shutdown) {
271            LOG.error("Interrupted, releasing " + this, e);
272            abort.abort("Worker thread interrupted", e);
273          }
274          return;
275        }
276      }
277    }
278  }
279}