001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client.locking;
020
021import java.io.IOException;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
029import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Lock for HBase Entity either a Table, a Namespace, or Regions.
040 *
041 * These are remote locks which live on master, and need periodic heartbeats to keep them alive.
042 * (Once we request the lock, internally an heartbeat thread will be started on the client).
043 * If master does not receive the heartbeat in time, it'll release the lock and make it available
044 * to other users.
045 *
046 * <p>Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}.
047 * {@link #requestLock} will contact master to queue the lock and start the heartbeat thread
048 * which will check lock's status periodically and once the lock is acquired, it will send the
049 * heartbeats to the master.
050 *
051 * <p>Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired.
052 * Always call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock
053 * was acquired, it'll be released. If it was not acquired, it is possible that master grants the
054 * lock in future and the heartbeat thread keeps it alive forever by sending heartbeats.
055 * Calling {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master.
056 *
057 * <p>There are 4 ways in which these remote locks may be released/can be lost:
058 * <ul><li>Call {@link #unlock}.</li>
059 * <li>Lock times out on master: Can happen because of network issues, GC pauses, etc.
060 *     Worker thread will call the given abortable as soon as it detects such a situation.</li>
061 * <li>Fail to contact master: If worker thread can not contact mater and thus fails to send
062 *     heartbeat before the timeout expires, it assumes that lock is lost and calls the
063 *     abortable.</li>
064 * <li>Worker thread is interrupted.</li>
065 * </ul>
066 *
067 * Use example:
068 * <code>
069 * EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable);
070 * lock.requestLock();
071 * ....
072 * ....can do other initializations here since lock is 'asynchronous'...
073 * ....
074 * if (lock.await(timeout)) {
075 *   ....logic requiring mutual exclusion
076 * }
077 * lock.unlock();
078 * </code>
079 */
080@InterfaceAudience.Public
081public class EntityLock {
082  private static final Logger LOG = LoggerFactory.getLogger(EntityLock.class);
083
084  public static final String HEARTBEAT_TIME_BUFFER =
085      "hbase.client.locks.heartbeat.time.buffer.ms";
086
087  private final AtomicBoolean locked = new AtomicBoolean(false);
088  private final CountDownLatch latch = new CountDownLatch(1);
089
090  private final LockService.BlockingInterface stub;
091  private final LockHeartbeatWorker worker;
092  private final LockRequest lockRequest;
093  private final Abortable abort;
094
095  // Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc.
096  private final int heartbeatTimeBuffer;
097
098  // set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait
099  // for long time periods between heartbeats.
100  private long testingSleepTime = 0;
101
102  private Long procId = null;
103
104  /**
105   * Abortable.abort() is called when the lease of the lock will expire.
106   * It's up to the user decide if simply abort the process or handle the loss of the lock
107   * by aborting the operation that was supposed to be under lock.
108   */
109  EntityLock(Configuration conf, LockService.BlockingInterface stub,
110      LockRequest request, Abortable abort) {
111    this.stub = stub;
112    this.lockRequest = request;
113    this.abort = abort;
114
115    this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
116    this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
117  }
118
119  @Override
120  public String toString() {
121    final StringBuilder sb = new StringBuilder();
122    sb.append("EntityLock locked=");
123    sb.append(locked.get());
124    sb.append(", procId=");
125    sb.append(procId);
126    sb.append(", type=");
127    sb.append(lockRequest.getLockType());
128    if (lockRequest.getRegionInfoCount() > 0) {
129      sb.append(", regions=");
130      for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) {
131        if (i > 0) sb.append(", ");
132        sb.append(lockRequest.getRegionInfo(i));
133      }
134    } else if (lockRequest.hasTableName()) {
135      sb.append(", table=");
136      sb.append(lockRequest.getTableName());
137    } else if (lockRequest.hasNamespace()) {
138      sb.append(", namespace=");
139      sb.append(lockRequest.getNamespace());
140    }
141    sb.append(", description=");
142    sb.append(lockRequest.getDescription());
143    return sb.toString();
144  }
145
146  @InterfaceAudience.Private
147  void setTestingSleepTime(long timeInMillis) {
148    testingSleepTime = timeInMillis;
149  }
150
151  @InterfaceAudience.Private
152  LockHeartbeatWorker getWorker() {
153    return worker;
154  }
155
156  public boolean isLocked() {
157    return locked.get();
158  }
159
160  /**
161   * Sends rpc to the master to request lock.
162   * The lock request is queued with other lock requests.
163   * Call {@link #await()} to wait on lock.
164   * Always call {@link #unlock()} after calling the below, even after error.
165   */
166  public void requestLock() throws IOException {
167    if (procId == null) {
168      try {
169        procId = stub.requestLock(null, lockRequest).getProcId();
170      } catch (Exception e) {
171        throw ProtobufUtil.handleRemoteException(e);
172      }
173      worker.start();
174    } else {
175      LOG.info("Lock already queued : " + toString());
176    }
177  }
178
179  /**
180   * @param timeout in milliseconds. If set to 0, waits indefinitely.
181   * @return true if lock was acquired; and false if waiting time elapsed before lock could be
182   * acquired.
183   */
184  public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
185    final boolean result = latch.await(timeout, timeUnit);
186    String lockRequestStr = lockRequest.toString().replace("\n", ", ");
187    if (result) {
188      LOG.info("Acquired " + lockRequestStr);
189    } else {
190      LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(),
191          lockRequestStr));
192    }
193    return result;
194  }
195
196  public void await() throws InterruptedException {
197    latch.await();
198  }
199
200  public void unlock() throws IOException {
201    Threads.shutdown(worker.shutdown());
202    try {
203      stub.lockHeartbeat(null,
204        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
205    } catch (Exception e) {
206      throw ProtobufUtil.handleRemoteException(e);
207    }
208  }
209
210  protected class LockHeartbeatWorker extends Thread {
211    private volatile boolean shutdown = false;
212
213    public LockHeartbeatWorker(final String desc) {
214      super("LockHeartbeatWorker(" + desc + ")");
215      setDaemon(true);
216    }
217
218    /**
219     * @return Shuts down the thread clean and quietly.
220     */
221    Thread shutdown() {
222      shutdown = true;
223      interrupt();
224      return this;
225    }
226
227    @Override
228    public void run() {
229      final LockHeartbeatRequest lockHeartbeatRequest =
230          LockHeartbeatRequest.newBuilder().setProcId(procId).build();
231
232      LockHeartbeatResponse response;
233      while (true) {
234        try {
235          response = stub.lockHeartbeat(null, lockHeartbeatRequest);
236        } catch (Exception e) {
237          e = ProtobufUtil.handleRemoteException(e);
238          locked.set(false);
239          LOG.error("Heartbeat failed, releasing " + EntityLock.this, e);
240          abort.abort("Heartbeat failed", e);
241          return;
242        }
243        if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
244          locked.set(true);
245          latch.countDown();
246        } else if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) {
247          // Lock timed out.
248          locked.set(false);
249          abort.abort("Lock timed out.", null);
250          return;
251        }
252
253        try {
254          // If lock not acquired yet, poll faster so we can notify faster.
255          long sleepTime = 1000;
256          if (isLocked()) {
257            // If lock acquired, then use lock timeout to determine heartbeat rate.
258            // If timeout is <heartbeatTimeBuffer, send back to back heartbeats.
259            sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1);
260          }
261          if (testingSleepTime != 0) {
262            sleepTime = testingSleepTime;
263          }
264          Thread.sleep(sleepTime);
265        } catch (InterruptedException e) {
266          // Since there won't be any more heartbeats, assume lock will be lost.
267          locked.set(false);
268          if (!this.shutdown) {
269            LOG.error("Interrupted, releasing " + this, e);
270            abort.abort("Worker thread interrupted", e);
271          }
272          return;
273        }
274      }
275    }
276  }
277}