001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
029
030/**
031 * Allows multiple concurrent clients to lock on a numeric id with a minimal memory overhead. The
032 * intended usage is as follows:
033 *
034 * <pre>
035 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
036 * try {
037 *   // User code.
038 * } finally {
039 *   idLock.releaseLockEntry(lockEntry);
040 * }
041 * </pre>
042 */
043@InterfaceAudience.Private
044public class IdLock {
045
046  private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
047
048  /** An entry returned to the client as a lock object */
049  public static final class Entry {
050    private final long id;
051    private int numWaiters;
052    private boolean locked = true;
053    private Thread holder;
054
055    private Entry(long id, Thread holder) {
056      this.id = id;
057      this.holder = holder;
058    }
059
060    @Override
061    public String toString() {
062      return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" + locked + ", holder="
063        + holder;
064    }
065  }
066
067  private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
068
069  /**
070   * Blocks until the lock corresponding to the given id is acquired.
071   * @param id an arbitrary number to lock on
072   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release the lock
073   * @throws IOException if interrupted
074   */
075  public Entry getLockEntry(long id) throws IOException {
076    Thread currentThread = Thread.currentThread();
077    Entry entry = new Entry(id, currentThread);
078    Entry existing;
079    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
080      synchronized (existing) {
081        if (existing.locked) {
082          ++existing.numWaiters; // Add ourselves to waiters.
083          while (existing.locked) {
084            try {
085              existing.wait();
086            } catch (InterruptedException e) {
087              --existing.numWaiters; // Remove ourselves from waiters.
088              // HBASE-21292
089              // There is a rare case that interrupting and the lock owner thread call
090              // releaseLockEntry at the same time. Since the owner thread found there
091              // still one waiting, it won't remove the entry from the map. If the interrupted
092              // thread is the last one waiting on the lock, and since an exception is thrown,
093              // the 'existing' entry will stay in the map forever. Later threads which try to
094              // get this lock will stuck in a infinite loop because
095              // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false.
096              if (!existing.locked && existing.numWaiters == 0) {
097                map.remove(existing.id);
098              }
099              throw new InterruptedIOException("Interrupted waiting to acquire sparse lock");
100            }
101          }
102
103          --existing.numWaiters; // Remove ourselves from waiters.
104          existing.locked = true;
105          existing.holder = currentThread;
106          return existing;
107        }
108        // If the entry is not locked, it might already be deleted from the
109        // map, so we cannot return it. We need to get our entry into the map
110        // or get someone else's locked entry.
111      }
112    }
113    return entry;
114  }
115
116  /**
117   * Blocks until the lock corresponding to the given id is acquired.
118   * @param id   an arbitrary number to lock on
119   * @param time time to wait in ms
120   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release the lock
121   * @throws IOException if interrupted
122   */
123  public Entry tryLockEntry(long id, long time) throws IOException {
124    Preconditions.checkArgument(time >= 0);
125    Thread currentThread = Thread.currentThread();
126    Entry entry = new Entry(id, currentThread);
127    Entry existing;
128    long waitUtilTS = EnvironmentEdgeManager.currentTime() + time;
129    long remaining = time;
130    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
131      synchronized (existing) {
132        if (existing.locked) {
133          ++existing.numWaiters; // Add ourselves to waiters.
134          try {
135            while (existing.locked) {
136              existing.wait(remaining);
137              if (existing.locked) {
138                long currentTS = EnvironmentEdgeManager.currentTime();
139                if (currentTS >= waitUtilTS) {
140                  // time is up
141                  return null;
142                } else {
143                  // our wait is waken, but the lock is still taken, this can happen
144                  // due to JDK Object's wait/notify mechanism.
145                  // Calculate the new remaining time to wait
146                  remaining = waitUtilTS - currentTS;
147                }
148              }
149
150            }
151          } catch (InterruptedException e) {
152            // HBASE-21292
153            // Please refer to the comments in getLockEntry()
154            // the difference here is that we decrease numWaiters in finally block
155            if (!existing.locked && existing.numWaiters == 1) {
156              map.remove(existing.id);
157            }
158            throw new InterruptedIOException("Interrupted waiting to acquire sparse lock");
159          } finally {
160            --existing.numWaiters; // Remove ourselves from waiters.
161          }
162          existing.locked = true;
163          existing.holder = currentThread;
164          return existing;
165        }
166        // If the entry is not locked, it might already be deleted from the
167        // map, so we cannot return it. We need to get our entry into the map
168        // or get someone else's locked entry.
169      }
170    }
171    return entry;
172  }
173
174  /**
175   * Must be called in a finally block to decrease the internal counter and remove the monitor
176   * object for the given id if the caller is the last client.
177   * @param entry the return value of {@link #getLockEntry(long)}
178   */
179  public void releaseLockEntry(Entry entry) {
180    Thread currentThread = Thread.currentThread();
181    synchronized (entry) {
182      if (entry.holder != currentThread) {
183        LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
184          entry);
185      }
186      entry.locked = false;
187      if (entry.numWaiters > 0) {
188        entry.notify();
189      } else {
190        map.remove(entry.id);
191      }
192    }
193  }
194
195  /**
196   * Test whether the given id is already locked by the current thread.
197   */
198  public boolean isHeldByCurrentThread(long id) {
199    Thread currentThread = Thread.currentThread();
200    Entry entry = map.get(id);
201    if (entry == null) {
202      return false;
203    }
204    synchronized (entry) {
205      return currentThread.equals(entry.holder);
206    }
207  }
208
209  void assertMapEmpty() {
210    assert map.isEmpty();
211  }
212
213  public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
214    for (Entry entry;;) {
215      entry = map.get(id);
216      if (entry != null) {
217        synchronized (entry) {
218          if (entry.numWaiters >= numWaiters) {
219            return;
220          }
221        }
222      }
223      Thread.sleep(100);
224    }
225  }
226}