001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
031
032/**
033 * Allows multiple concurrent clients to lock on a numeric id with a minimal
034 * memory overhead. The intended usage is as follows:
035 *
036 * <pre>
037 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
038 * try {
039 *   // User code.
040 * } finally {
041 *   idLock.releaseLockEntry(lockEntry);
042 * }</pre>
043 */
044@InterfaceAudience.Private
045public class IdLock {
046
047  private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
048
049  /** An entry returned to the client as a lock object */
050  public static final class Entry {
051    private final long id;
052    private int numWaiters;
053    private boolean locked = true;
054    private Thread holder;
055
056    private Entry(long id, Thread holder) {
057      this.id = id;
058      this.holder = holder;
059    }
060
061    @Override
062    public String toString() {
063      return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
064          + locked + ", holder=" + holder;
065    }
066  }
067
068  private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
069
070  /**
071   * Blocks until the lock corresponding to the given id is acquired.
072   *
073   * @param id an arbitrary number to lock on
074   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
075   *         the lock
076   * @throws IOException if interrupted
077   */
078  public Entry getLockEntry(long id) throws IOException {
079    Thread currentThread = Thread.currentThread();
080    Entry entry = new Entry(id, currentThread);
081    Entry existing;
082    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
083      synchronized (existing) {
084        if (existing.locked) {
085          ++existing.numWaiters;  // Add ourselves to waiters.
086          while (existing.locked) {
087            try {
088              existing.wait();
089            } catch (InterruptedException e) {
090              --existing.numWaiters;  // Remove ourselves from waiters.
091              // HBASE-21292
092              // There is a rare case that interrupting and the lock owner thread call
093              // releaseLockEntry at the same time. Since the owner thread found there
094              // still one waiting, it won't remove the entry from the map. If the interrupted
095              // thread is the last one waiting on the lock, and since an exception is thrown,
096              // the 'existing' entry will stay in the map forever. Later threads which try to
097              // get this lock will stuck in a infinite loop because
098              // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false.
099              if (!existing.locked && existing.numWaiters == 0) {
100                map.remove(existing.id);
101              }
102              throw new InterruptedIOException(
103                  "Interrupted waiting to acquire sparse lock");
104            }
105          }
106
107          --existing.numWaiters;  // Remove ourselves from waiters.
108          existing.locked = true;
109          existing.holder = currentThread;
110          return existing;
111        }
112        // If the entry is not locked, it might already be deleted from the
113        // map, so we cannot return it. We need to get our entry into the map
114        // or get someone else's locked entry.
115      }
116    }
117    return entry;
118  }
119
120  /**
121   * Blocks until the lock corresponding to the given id is acquired.
122   *
123   * @param id an arbitrary number to lock on
124   * @param time time to wait in ms
125   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
126   *         the lock
127   * @throws IOException if interrupted
128   */
129  public Entry tryLockEntry(long id, long time) throws IOException {
130    Preconditions.checkArgument(time >= 0);
131    Thread currentThread = Thread.currentThread();
132    Entry entry = new Entry(id, currentThread);
133    Entry existing;
134    long waitUtilTS = System.currentTimeMillis() + time;
135    long remaining = time;
136    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
137      synchronized (existing) {
138        if (existing.locked) {
139          ++existing.numWaiters;  // Add ourselves to waiters.
140          try {
141            while (existing.locked) {
142              existing.wait(remaining);
143              if (existing.locked) {
144                long currentTS = System.currentTimeMillis();
145                if (currentTS >= waitUtilTS) {
146                  // time is up
147                  return null;
148                } else {
149                  // our wait is waken, but the lock is still taken, this can happen
150                  // due to JDK Object's wait/notify mechanism.
151                  // Calculate the new remaining time to wait
152                  remaining = waitUtilTS - currentTS;
153                }
154              }
155
156            }
157          } catch (InterruptedException e) {
158            // HBASE-21292
159            // Please refer to the comments in getLockEntry()
160            // the difference here is that we decrease numWaiters in finally block
161            if (!existing.locked && existing.numWaiters == 1) {
162              map.remove(existing.id);
163            }
164            throw new InterruptedIOException(
165                "Interrupted waiting to acquire sparse lock");
166          } finally {
167            --existing.numWaiters;  // Remove ourselves from waiters.
168          }
169          existing.locked = true;
170          existing.holder = currentThread;
171          return existing;
172        }
173        // If the entry is not locked, it might already be deleted from the
174        // map, so we cannot return it. We need to get our entry into the map
175        // or get someone else's locked entry.
176      }
177    }
178    return entry;
179  }
180
181  /**
182   * Must be called in a finally block to decrease the internal counter and remove the monitor
183   * object for the given id if the caller is the last client.
184   * @param entry the return value of {@link #getLockEntry(long)}
185   */
186  public void releaseLockEntry(Entry entry) {
187    Thread currentThread = Thread.currentThread();
188    synchronized (entry) {
189      if (entry.holder != currentThread) {
190        LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
191          entry);
192      }
193      entry.locked = false;
194      if (entry.numWaiters > 0) {
195        entry.notify();
196      } else {
197        map.remove(entry.id);
198      }
199    }
200  }
201
202  /**
203   * Test whether the given id is already locked by the current thread.
204   */
205  public boolean isHeldByCurrentThread(long id) {
206    Thread currentThread = Thread.currentThread();
207    Entry entry = map.get(id);
208    if (entry == null) {
209      return false;
210    }
211    synchronized (entry) {
212      return currentThread.equals(entry.holder);
213    }
214  }
215
216  @VisibleForTesting
217  void assertMapEmpty() {
218    assert map.isEmpty();
219  }
220
221  @VisibleForTesting
222  public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
223    for (Entry entry;;) {
224      entry = map.get(id);
225      if (entry != null) {
226        synchronized (entry) {
227          if (entry.numWaiters >= numWaiters) {
228            return;
229          }
230        }
231      }
232      Thread.sleep(100);
233    }
234  }
235}