001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.lang.ref.Reference;
022import java.util.concurrent.locks.ReentrantReadWriteLock;
023
024import org.apache.yetus.audience.InterfaceAudience;
025
026import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
027
028/**
029 * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The
030 * intended usage for read lock is as follows:
031 *
032 * <pre>
033 * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
034 * try {
035 *   lock.readLock().lock();
036 *   // User code.
037 * } finally {
038 *   lock.readLock().unlock();
039 * }
040 * </pre>
041 *
042 * For write lock, use lock.writeLock()
043 */
044@InterfaceAudience.Private
045public class IdReadWriteLock<T> {
046  // The number of lock we want to easily support. It's not a maximum.
047  private static final int NB_CONCURRENT_LOCKS = 1000;
048  /**
049   * The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
050   * garbage-collected by JVM
051   */
052  private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
053  private final ReferenceType refType;
054
055  public IdReadWriteLock() {
056    this(ReferenceType.WEAK);
057  }
058
059  /**
060   * Constructor of IdReadWriteLock
061   * @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
062   *          default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
063   *          be reused with a high frequency
064   */
065  public IdReadWriteLock(ReferenceType referenceType) {
066    this.refType = referenceType;
067    switch (referenceType) {
068      case SOFT:
069        lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
070          @Override
071          public ReentrantReadWriteLock createObject(T id) {
072            return new ReentrantReadWriteLock();
073          }
074        }, NB_CONCURRENT_LOCKS);
075        break;
076      case WEAK:
077      default:
078        lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
079          @Override
080          public ReentrantReadWriteLock createObject(T id) {
081            return new ReentrantReadWriteLock();
082          }
083        }, NB_CONCURRENT_LOCKS);
084    }
085  }
086
087  public static enum ReferenceType {
088    WEAK, SOFT
089  }
090
091  /**
092   * Get the ReentrantReadWriteLock corresponding to the given id
093   * @param id an arbitrary number to identify the lock
094   */
095  public ReentrantReadWriteLock getLock(T id) {
096    lockPool.purge();
097    ReentrantReadWriteLock readWriteLock = lockPool.get(id);
098    return readWriteLock;
099  }
100
101  /** For testing */
102  @VisibleForTesting
103  int purgeAndGetEntryPoolSize() {
104    gc();
105    Threads.sleep(200);
106    lockPool.purge();
107    return lockPool.size();
108  }
109
110  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
111  private void gc() {
112    System.gc();
113  }
114
115  @VisibleForTesting
116  public void waitForWaiters(T id, int numWaiters) throws InterruptedException {
117    for (ReentrantReadWriteLock readWriteLock;;) {
118      readWriteLock = lockPool.get(id);
119      if (readWriteLock != null) {
120        synchronized (readWriteLock) {
121          if (readWriteLock.getQueueLength() >= numWaiters) {
122            return;
123          }
124        }
125      }
126      Thread.sleep(50);
127    }
128  }
129
130  @VisibleForTesting
131  public ReferenceType getReferenceType() {
132    return this.refType;
133  }
134}