001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.lang.ref.Reference;
021import java.lang.ref.ReferenceQueue;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024import java.util.concurrent.locks.Lock;
025import java.util.concurrent.locks.ReentrantLock;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
030 * objects may be excessively created and discarded.
031 */
032@InterfaceAudience.Private
033public abstract class ObjectPool<K, V> {
034  /**
035   * An {@code ObjectFactory} object is used to create new shared objects on demand.
036   */
037  public interface ObjectFactory<K, V> {
038    /**
039     * Creates a new shared object associated with the given {@code key}, identified by the
040     * {@code equals} method. This method may be simultaneously called by multiple threads with the
041     * same key, and the excessive objects are just discarded.
042     */
043    V createObject(K key);
044  }
045
046  protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
047
048  private final ObjectFactory<K, V> objectFactory;
049
050  /** Does not permit null keys. */
051  protected final ConcurrentMap<K, Reference<V>> referenceCache;
052
053  /** For preventing parallel purge */
054  private final Lock purgeLock = new ReentrantLock();
055
056  /**
057   * The default initial capacity, used when not otherwise specified in a constructor.
058   */
059  public static final int DEFAULT_INITIAL_CAPACITY = 16;
060
061  /**
062   * The default concurrency level, used when not otherwise specified in a constructor.
063   */
064  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
065
066  /**
067   * Creates a new pool with the default initial capacity (16) and the default concurrency level
068   * (16).
069   * @param objectFactory the factory to supply new objects on demand
070   * @throws NullPointerException if {@code objectFactory} is null
071   */
072  public ObjectPool(ObjectFactory<K, V> objectFactory) {
073    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
074  }
075
076  /**
077   * Creates a new pool with the given initial capacity and the default concurrency level (16).
078   * @param objectFactory   the factory to supply new objects on demand
079   * @param initialCapacity the initial capacity to keep objects in the pool
080   * @throws NullPointerException     if {@code objectFactory} is null
081   * @throws IllegalArgumentException if {@code initialCapacity} is negative
082   */
083  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
084    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
085  }
086
087  /**
088   * Creates a new pool with the given initial capacity and the given concurrency level.
089   * @param objectFactory    the factory to supply new objects on demand
090   * @param initialCapacity  the initial capacity to keep objects in the pool
091   * @param concurrencyLevel the estimated count of concurrently accessing threads
092   * @throws NullPointerException     if {@code objectFactory} is null
093   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
094   *                                  {@code concurrencyLevel} is non-positive
095   */
096  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, int concurrencyLevel) {
097
098    if (objectFactory == null) {
099      throw new NullPointerException("Given object factory instance is NULL");
100    }
101    this.objectFactory = objectFactory;
102
103    this.referenceCache =
104      new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
105  }
106
107  /**
108   * Removes stale references of shared objects from the pool. References newly becoming stale may
109   * still remain.
110   * <p/>
111   * The implementation of this method is expected to be lightweight when there is no stale
112   * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because
113   * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}.
114   */
115  public void purge() {
116    if (purgeLock.tryLock()) {// no parallel purge
117      try {
118        while (true) {
119          @SuppressWarnings("unchecked")
120          Reference<V> ref = (Reference<V>) staleRefQueue.poll();
121          if (ref == null) {
122            break;
123          }
124          referenceCache.remove(getReferenceKey(ref), ref);
125        }
126      } finally {
127        purgeLock.unlock();
128      }
129    }
130  }
131
132  /**
133   * Create a reference associated with the given object
134   * @param key the key to store in the reference
135   * @param obj the object to associate with
136   * @return the reference instance
137   */
138  public abstract Reference<V> createReference(K key, V obj);
139
140  /**
141   * Get key of the given reference
142   * @param ref The reference
143   * @return key of the reference
144   */
145  public abstract K getReferenceKey(Reference<V> ref);
146
147  /**
148   * Returns a shared object associated with the given {@code key}, which is identified by the
149   * {@code equals} method.
150   * @throws NullPointerException if {@code key} is null
151   */
152  public V get(K key) {
153    Reference<V> ref = referenceCache.get(key);
154    if (ref != null) {
155      V obj = ref.get();
156      if (obj != null) {
157        return obj;
158      }
159      referenceCache.remove(key, ref);
160    }
161
162    V newObj = objectFactory.createObject(key);
163    Reference<V> newRef = createReference(key, newObj);
164    while (true) {
165      Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
166      if (existingRef == null) {
167        return newObj;
168      }
169
170      V existingObject = existingRef.get();
171      if (existingObject != null) {
172        return existingObject;
173      }
174      referenceCache.remove(key, existingRef);
175    }
176  }
177
178  /**
179   * Returns an estimated count of objects kept in the pool. This also counts stale references, and
180   * you might want to call {@link #purge()} beforehand.
181   */
182  public int size() {
183    return referenceCache.size();
184  }
185}