001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
029
030/**
031 * Allows multiple concurrent clients to lock on a numeric id with a minimal
032 * memory overhead. The intended usage is as follows:
033 *
034 * <pre>
035 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
036 * try {
037 *   // User code.
038 * } finally {
039 *   idLock.releaseLockEntry(lockEntry);
040 * }</pre>
041 */
042@InterfaceAudience.Private
043public class IdLock {
044
045  private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
046
047  /** An entry returned to the client as a lock object */
048  public static final class Entry {
049    private final long id;
050    private int numWaiters;
051    private boolean locked = true;
052    private Thread holder;
053
054    private Entry(long id, Thread holder) {
055      this.id = id;
056      this.holder = holder;
057    }
058
059    @Override
060    public String toString() {
061      return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
062          + locked + ", holder=" + holder;
063    }
064  }
065
066  private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
067
068  /**
069   * Blocks until the lock corresponding to the given id is acquired.
070   *
071   * @param id an arbitrary number to lock on
072   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
073   *         the lock
074   * @throws IOException if interrupted
075   */
076  public Entry getLockEntry(long id) throws IOException {
077    Thread currentThread = Thread.currentThread();
078    Entry entry = new Entry(id, currentThread);
079    Entry existing;
080    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
081      synchronized (existing) {
082        if (existing.locked) {
083          ++existing.numWaiters;  // Add ourselves to waiters.
084          while (existing.locked) {
085            try {
086              existing.wait();
087            } catch (InterruptedException e) {
088              --existing.numWaiters;  // Remove ourselves from waiters.
089              // HBASE-21292
090              // There is a rare case that interrupting and the lock owner thread call
091              // releaseLockEntry at the same time. Since the owner thread found there
092              // still one waiting, it won't remove the entry from the map. If the interrupted
093              // thread is the last one waiting on the lock, and since an exception is thrown,
094              // the 'existing' entry will stay in the map forever. Later threads which try to
095              // get this lock will stuck in a infinite loop because
096              // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false.
097              if (!existing.locked && existing.numWaiters == 0) {
098                map.remove(existing.id);
099              }
100              throw new InterruptedIOException(
101                  "Interrupted waiting to acquire sparse lock");
102            }
103          }
104
105          --existing.numWaiters;  // Remove ourselves from waiters.
106          existing.locked = true;
107          existing.holder = currentThread;
108          return existing;
109        }
110        // If the entry is not locked, it might already be deleted from the
111        // map, so we cannot return it. We need to get our entry into the map
112        // or get someone else's locked entry.
113      }
114    }
115    return entry;
116  }
117
118  /**
119   * Blocks until the lock corresponding to the given id is acquired.
120   *
121   * @param id an arbitrary number to lock on
122   * @param time time to wait in ms
123   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
124   *         the lock
125   * @throws IOException if interrupted
126   */
127  public Entry tryLockEntry(long id, long time) throws IOException {
128    Preconditions.checkArgument(time >= 0);
129    Thread currentThread = Thread.currentThread();
130    Entry entry = new Entry(id, currentThread);
131    Entry existing;
132    long waitUtilTS = System.currentTimeMillis() + time;
133    long remaining = time;
134    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
135      synchronized (existing) {
136        if (existing.locked) {
137          ++existing.numWaiters;  // Add ourselves to waiters.
138          try {
139            while (existing.locked) {
140              existing.wait(remaining);
141              if (existing.locked) {
142                long currentTS = System.currentTimeMillis();
143                if (currentTS >= waitUtilTS) {
144                  // time is up
145                  return null;
146                } else {
147                  // our wait is waken, but the lock is still taken, this can happen
148                  // due to JDK Object's wait/notify mechanism.
149                  // Calculate the new remaining time to wait
150                  remaining = waitUtilTS - currentTS;
151                }
152              }
153
154            }
155          } catch (InterruptedException e) {
156            // HBASE-21292
157            // Please refer to the comments in getLockEntry()
158            // the difference here is that we decrease numWaiters in finally block
159            if (!existing.locked && existing.numWaiters == 1) {
160              map.remove(existing.id);
161            }
162            throw new InterruptedIOException(
163                "Interrupted waiting to acquire sparse lock");
164          } finally {
165            --existing.numWaiters;  // Remove ourselves from waiters.
166          }
167          existing.locked = true;
168          existing.holder = currentThread;
169          return existing;
170        }
171        // If the entry is not locked, it might already be deleted from the
172        // map, so we cannot return it. We need to get our entry into the map
173        // or get someone else's locked entry.
174      }
175    }
176    return entry;
177  }
178
179  /**
180   * Must be called in a finally block to decrease the internal counter and remove the monitor
181   * object for the given id if the caller is the last client.
182   * @param entry the return value of {@link #getLockEntry(long)}
183   */
184  public void releaseLockEntry(Entry entry) {
185    Thread currentThread = Thread.currentThread();
186    synchronized (entry) {
187      if (entry.holder != currentThread) {
188        LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
189          entry);
190      }
191      entry.locked = false;
192      if (entry.numWaiters > 0) {
193        entry.notify();
194      } else {
195        map.remove(entry.id);
196      }
197    }
198  }
199
200  /**
201   * Test whether the given id is already locked by the current thread.
202   */
203  public boolean isHeldByCurrentThread(long id) {
204    Thread currentThread = Thread.currentThread();
205    Entry entry = map.get(id);
206    if (entry == null) {
207      return false;
208    }
209    synchronized (entry) {
210      return currentThread.equals(entry.holder);
211    }
212  }
213
214  void assertMapEmpty() {
215    assert map.isEmpty();
216  }
217
218  public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
219    for (Entry entry;;) {
220      entry = map.get(id);
221      if (entry != null) {
222        synchronized (entry) {
223          if (entry.numWaiters >= numWaiters) {
224            return;
225          }
226        }
227      }
228      Thread.sleep(100);
229    }
230  }
231}