View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.SortedSet;
26  import java.util.TreeSet;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  /**
36   * A utility class to manage a set of locks. Each lock is identified by a String which serves
37   * as a key. Typical usage is: <p>
38   * class Example{
39   * private final static KeyLocker<String> locker = new Locker<String>();
40   * <p/>
41   * public void foo(String s){
42   * Lock lock = locker.acquireLock(s);
43   * try {
44   * // whatever
45   * }finally{
46   * lock.unlock();
47   * }
48   * }
49   * }
50   * </p>
51   */
52  @InterfaceAudience.Private
53  public class KeyLocker<K extends Comparable<? super K>> {
54    private static final Log LOG = LogFactory.getLog(KeyLocker.class);
55  
56    // The number of lock we want to easily support. It's not a maximum.
57    private static final int NB_CONCURRENT_LOCKS = 1000;
58  
59    // We need an atomic counter to manage the number of users using the lock and free it when
60    //  it's equal to zero.
61    private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
62      new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
63  
64    /**
65     * Return a lock for the given key. The lock is already locked.
66     *
67     * @param key
68     */
69    public ReentrantLock acquireLock(K key) {
70      if (key == null) throw new IllegalArgumentException("key must not be null");
71  
72      Pair<KeyLock<K>, AtomicInteger> lock;
73      synchronized (this) {
74        lock = locks.get(key);
75        if (lock == null) {
76          lock = new Pair<KeyLock<K>, AtomicInteger>(
77            new KeyLock<K>(this, key), new AtomicInteger(1));
78          locks.put(key, lock);
79        } else {
80          lock.getSecond().incrementAndGet();
81        }
82      }
83      lock.getFirst().lock();
84      return lock.getFirst();
85    }
86  
87    /**
88     * Acquire locks for a set of keys. The keys will be
89     * sorted internally to avoid possible deadlock.
90     */
91    public Map<K, Lock> acquireLocks(final Set<K> keys) {
92      Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
93      SortedSet<K> sortedKeys = new TreeSet<K>(keys);
94      for (K key : sortedKeys) {
95        locks.put(key, acquireLock(key));
96      }
97      return locks;
98    }
99  
100   /**
101    * Free the lock for the given key.
102    */
103   private synchronized void releaseLock(K key) {
104     Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
105     if (lock != null) {
106       if (lock.getSecond().decrementAndGet() == 0) {
107         locks.remove(key);
108       }
109     } else {
110       String message = "Can't release the lock for " + key+", this key is not in the key list." +
111         " known keys are: "+ locks.keySet();
112       LOG.error(message);
113       throw new RuntimeException(message);
114     }
115   }
116 
117   static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
118     private static final long serialVersionUID = -12432857283423584L;
119 
120     private final transient KeyLocker<K> locker;
121     private final K lockId;
122 
123     private KeyLock(KeyLocker<K> locker, K lockId) {
124       super();
125       this.locker = locker;
126       this.lockId = lockId;
127     }
128 
129     @Override
130     public void unlock() {
131       super.unlock();
132       locker.releaseLock(lockId);
133     }
134   }
135 }