001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.lang.ref.Reference;
022import java.util.concurrent.locks.ReentrantReadWriteLock;
023
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The
028 * intended usage for read lock is as follows:
029 *
030 * <pre>
031 * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
032 * try {
033 *   lock.readLock().lock();
034 *   // User code.
035 * } finally {
036 *   lock.readLock().unlock();
037 * }
038 * </pre>
039 *
040 * For write lock, use lock.writeLock()
041 */
042@InterfaceAudience.Private
043public class IdReadWriteLock<T> {
044  // The number of lock we want to easily support. It's not a maximum.
045  private static final int NB_CONCURRENT_LOCKS = 1000;
046  /**
047   * The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
048   * garbage-collected by JVM
049   */
050  private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
051  private final ReferenceType refType;
052
053  public IdReadWriteLock() {
054    this(ReferenceType.WEAK);
055  }
056
057  /**
058   * Constructor of IdReadWriteLock
059   * @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
060   *          default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
061   *          be reused with a high frequency
062   */
063  public IdReadWriteLock(ReferenceType referenceType) {
064    this.refType = referenceType;
065    switch (referenceType) {
066      case SOFT:
067        lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
068          @Override
069          public ReentrantReadWriteLock createObject(T id) {
070            return new ReentrantReadWriteLock();
071          }
072        }, NB_CONCURRENT_LOCKS);
073        break;
074      case WEAK:
075      default:
076        lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
077          @Override
078          public ReentrantReadWriteLock createObject(T id) {
079            return new ReentrantReadWriteLock();
080          }
081        }, NB_CONCURRENT_LOCKS);
082    }
083  }
084
085  public static enum ReferenceType {
086    WEAK, SOFT
087  }
088
089  /**
090   * Get the ReentrantReadWriteLock corresponding to the given id
091   * @param id an arbitrary number to identify the lock
092   */
093  public ReentrantReadWriteLock getLock(T id) {
094    lockPool.purge();
095    ReentrantReadWriteLock readWriteLock = lockPool.get(id);
096    return readWriteLock;
097  }
098
099  /** For testing */
100  int purgeAndGetEntryPoolSize() {
101    gc();
102    Threads.sleep(200);
103    lockPool.purge();
104    return lockPool.size();
105  }
106
107  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
108  private void gc() {
109    System.gc();
110  }
111
112  public void waitForWaiters(T id, int numWaiters) throws InterruptedException {
113    for (ReentrantReadWriteLock readWriteLock;;) {
114      readWriteLock = lockPool.get(id);
115      if (readWriteLock != null) {
116        synchronized (readWriteLock) {
117          if (readWriteLock.getQueueLength() >= numWaiters) {
118            return;
119          }
120        }
121      }
122      Thread.sleep(50);
123    }
124  }
125
126  public ReferenceType getReferenceType() {
127    return this.refType;
128  }
129}