View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  
28  import com.google.common.annotations.VisibleForTesting;
29  
30  /**
31   * Allows multiple concurrent clients to lock on a numeric id with a minimal
32   * memory overhead. The intended usage is as follows:
33   *
34   * <pre>
35   * IdLock.Entry lockEntry = idLock.getLockEntry(id);
36   * try {
37   *   // User code.
38   * } finally {
39   *   idLock.releaseLockEntry(lockEntry);
40   * }</pre>
41   */
42  @InterfaceAudience.Private
43  public class IdLock {
44  
45    /** An entry returned to the client as a lock object */
46    public static class Entry {
47      private final long id;
48      private int numWaiters;
49      private boolean isLocked = true;
50  
51      private Entry(long id) {
52        this.id = id;
53      }
54  
55      public String toString() {
56        return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
57            + isLocked;
58      }
59    }
60  
61    private ConcurrentMap<Long, Entry> map =
62        new ConcurrentHashMap<Long, Entry>();
63  
64    /**
65     * Blocks until the lock corresponding to the given id is acquired.
66     *
67     * @param id an arbitrary number to lock on
68     * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
69     *         the lock
70     * @throws IOException if interrupted
71     */
72    public Entry getLockEntry(long id) throws IOException {
73      Entry entry = new Entry(id);
74      Entry existing;
75      while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
76        synchronized (existing) {
77          if (existing.isLocked) {
78            ++existing.numWaiters;  // Add ourselves to waiters.
79            while (existing.isLocked) {
80              try {
81                existing.wait();
82              } catch (InterruptedException e) {
83                --existing.numWaiters;  // Remove ourselves from waiters.
84                throw new InterruptedIOException(
85                    "Interrupted waiting to acquire sparse lock");
86              }
87            }
88  
89            --existing.numWaiters;  // Remove ourselves from waiters.
90            existing.isLocked = true;
91            return existing;
92          }
93          // If the entry is not locked, it might already be deleted from the
94          // map, so we cannot return it. We need to get our entry into the map
95          // or get someone else's locked entry.
96        }
97      }
98      return entry;
99    }
100 
101   /**
102    * Must be called in a finally block to decrease the internal counter and
103    * remove the monitor object for the given id if the caller is the last
104    * client.
105    *
106    * @param entry the return value of {@link #getLockEntry(long)}
107    */
108   public void releaseLockEntry(Entry entry) {
109     synchronized (entry) {
110       entry.isLocked = false;
111       if (entry.numWaiters > 0) {
112         entry.notify();
113       } else {
114         map.remove(entry.id);
115       }
116     }
117   }
118 
119   /** For testing */
120   void assertMapEmpty() {
121     assert map.size() == 0;
122   }
123 
124   @VisibleForTesting
125   public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
126     for (Entry entry;;) {
127       entry = map.get(id);
128       if (entry != null) {
129         synchronized (entry) {
130           if (entry.numWaiters >= numWaiters) {
131             return;
132           }
133         }
134       }
135       Thread.sleep(100);
136     }
137   }
138 }