View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.concurrent.locks.ReentrantReadWriteLock;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  
25  import com.google.common.annotations.VisibleForTesting;
26  
27  /**
28   * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The
29   * intended usage for read lock is as follows:
30   *
31   * <pre>
32   * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
33   * try {
34   *   lock.readLock().lock();
35   *   // User code.
36   * } finally {
37   *   lock.readLock().unlock();
38   * }
39   * </pre>
40   *
41   * For write lock, use lock.writeLock()
42   */
43  @InterfaceAudience.Private
44  public class IdReadWriteLock {
45    // The number of lock we want to easily support. It's not a maximum.
46    private static final int NB_CONCURRENT_LOCKS = 1000;
47    // The pool to get entry from, entries are mapped by weak reference to make it able to be
48    // garbage-collected asap
49    private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool =
50        new WeakObjectPool<Long, ReentrantReadWriteLock>(
51            new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
52              @Override
53              public ReentrantReadWriteLock createObject(Long id) {
54                return new ReentrantReadWriteLock();
55              }
56            }, NB_CONCURRENT_LOCKS);
57  
58    /**
59     * Get the ReentrantReadWriteLock corresponding to the given id
60     * @param id an arbitrary number to identify the lock
61     */
62    public ReentrantReadWriteLock getLock(long id) {
63      lockPool.purge();
64      ReentrantReadWriteLock readWriteLock = lockPool.get(id);
65      return readWriteLock;
66    }
67  
68    /** For testing */
69    @VisibleForTesting
70    int purgeAndGetEntryPoolSize() {
71      gc();
72      Threads.sleep(200);
73      lockPool.purge();
74      return lockPool.size();
75    }
76  
77    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
78    private void gc() {
79      System.gc();
80    }
81  
82    @VisibleForTesting
83    public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
84      for (ReentrantReadWriteLock readWriteLock;;) {
85        readWriteLock = lockPool.get(id);
86        if (readWriteLock != null) {
87          synchronized (readWriteLock) {
88            if (readWriteLock.getQueueLength() >= numWaiters) {
89              return;
90            }
91          }
92        }
93        Thread.sleep(50);
94      }
95    }
96  }