View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.SortedSet;
26  import java.util.TreeSet;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  /**
36   * A utility class to manage a set of locks. Each lock is identified by a String which serves
37   * as a key. Typical usage is: <p>
38   * class Example{
39   * private final static KeyLocker&lt;String&gt; locker = new Locker&lt;String&gt;();
40   * </p>
41   * <p>
42   * public void foo(String s){
43   * Lock lock = locker.acquireLock(s);
44   * try {
45   * // whatever
46   * }finally{
47   * lock.unlock();
48   * }
49   * }
50   * }
51   * </p>
52   */
53  @InterfaceAudience.Private
54  public class KeyLocker<K extends Comparable<? super K>> {
55    private static final Log LOG = LogFactory.getLog(KeyLocker.class);
56  
57    // The number of lock we want to easily support. It's not a maximum.
58    private static final int NB_CONCURRENT_LOCKS = 1000;
59  
60    // We need an atomic counter to manage the number of users using the lock and free it when
61    //  it's equal to zero.
62    private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
63      new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
64  
65    /**
66     * Return a lock for the given key. The lock is already locked.
67     *
68     * @param key
69     */
70    public ReentrantLock acquireLock(K key) {
71      if (key == null) throw new IllegalArgumentException("key must not be null");
72  
73      Pair<KeyLock<K>, AtomicInteger> lock;
74      synchronized (this) {
75        lock = locks.get(key);
76        if (lock == null) {
77          lock = new Pair<KeyLock<K>, AtomicInteger>(
78            new KeyLock<K>(this, key), new AtomicInteger(1));
79          locks.put(key, lock);
80        } else {
81          lock.getSecond().incrementAndGet();
82        }
83      }
84      lock.getFirst().lock();
85      return lock.getFirst();
86    }
87  
88    /**
89     * Acquire locks for a set of keys. The keys will be
90     * sorted internally to avoid possible deadlock.
91     */
92    public Map<K, Lock> acquireLocks(final Set<K> keys) {
93      Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
94      SortedSet<K> sortedKeys = new TreeSet<K>(keys);
95      for (K key : sortedKeys) {
96        locks.put(key, acquireLock(key));
97      }
98      return locks;
99    }
100 
101   /**
102    * Free the lock for the given key.
103    */
104   private synchronized void releaseLock(K key) {
105     Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
106     if (lock != null) {
107       if (lock.getSecond().decrementAndGet() == 0) {
108         locks.remove(key);
109       }
110     } else {
111       String message = "Can't release the lock for " + key+", this key is not in the key list." +
112         " known keys are: "+ locks.keySet();
113       LOG.error(message);
114       throw new RuntimeException(message);
115     }
116   }
117 
118   static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
119     private static final long serialVersionUID = -12432857283423584L;
120 
121     private final transient KeyLocker<K> locker;
122     private final K lockId;
123 
124     private KeyLock(KeyLocker<K> locker, K lockId) {
125       super();
126       this.locker = locker;
127       this.lockId = lockId;
128     }
129 
130     @Override
131     public void unlock() {
132       super.unlock();
133       locker.releaseLock(lockId);
134     }
135   }
136 }