001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import java.lang.ref.Reference;
022import java.lang.ref.ReferenceQueue;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantLock;
027
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
032 * objects may be excessively created and discarded.
033 */
034@InterfaceAudience.Private
035public abstract class ObjectPool<K, V> {
036  /**
037   * An {@code ObjectFactory} object is used to create
038   * new shared objects on demand.
039   */
040  public interface ObjectFactory<K, V> {
041    /**
042     * Creates a new shared object associated with the given {@code key},
043     * identified by the {@code equals} method.
044     * This method may be simultaneously called by multiple threads
045     * with the same key, and the excessive objects are just discarded.
046     */
047    V createObject(K key);
048  }
049
050  protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
051
052  private final ObjectFactory<K, V> objectFactory;
053
054  /** Does not permit null keys. */
055  protected final ConcurrentMap<K, Reference<V>> referenceCache;
056
057  /** For preventing parallel purge */
058  private final Lock purgeLock = new ReentrantLock();
059
060  /**
061   * The default initial capacity,
062   * used when not otherwise specified in a constructor.
063   */
064  public static final int DEFAULT_INITIAL_CAPACITY = 16;
065
066  /**
067   * The default concurrency level,
068   * used when not otherwise specified in a constructor.
069   */
070  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
071
072  /**
073   * Creates a new pool with the default initial capacity (16)
074   * and the default concurrency level (16).
075   *
076   * @param objectFactory the factory to supply new objects on demand
077   *
078   * @throws NullPointerException if {@code objectFactory} is null
079   */
080  public ObjectPool(ObjectFactory<K, V> objectFactory) {
081    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
082  }
083
084  /**
085   * Creates a new pool with the given initial capacity
086   * and the default concurrency level (16).
087   *
088   * @param objectFactory the factory to supply new objects on demand
089   * @param initialCapacity the initial capacity to keep objects in the pool
090   *
091   * @throws NullPointerException if {@code objectFactory} is null
092   * @throws IllegalArgumentException if {@code initialCapacity} is negative
093   */
094  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
095    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
096  }
097
098  /**
099   * Creates a new pool with the given initial capacity
100   * and the given concurrency level.
101   *
102   * @param objectFactory the factory to supply new objects on demand
103   * @param initialCapacity the initial capacity to keep objects in the pool
104   * @param concurrencyLevel the estimated count of concurrently accessing threads
105   *
106   * @throws NullPointerException if {@code objectFactory} is null
107   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
108   *    {@code concurrencyLevel} is non-positive
109   */
110  public ObjectPool(
111      ObjectFactory<K, V> objectFactory,
112      int initialCapacity,
113      int concurrencyLevel) {
114
115    if (objectFactory == null) {
116      throw new NullPointerException("Given object factory instance is NULL");
117    }
118    this.objectFactory = objectFactory;
119
120    this.referenceCache =
121        new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
122  }
123
124  /**
125   * Removes stale references of shared objects from the pool. References newly becoming stale may
126   * still remain.
127   * <p/>
128   * The implementation of this method is expected to be lightweight when there is no stale
129   * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because
130   * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}.
131   */
132  public void purge() {
133    if (purgeLock.tryLock()) {// no parallel purge
134      try {
135        while (true) {
136          @SuppressWarnings("unchecked")
137          Reference<V> ref = (Reference<V>) staleRefQueue.poll();
138          if (ref == null) {
139            break;
140          }
141          referenceCache.remove(getReferenceKey(ref), ref);
142        }
143      } finally {
144        purgeLock.unlock();
145      }
146    }
147  }
148
149  /**
150   * Create a reference associated with the given object
151   * @param key the key to store in the reference
152   * @param obj the object to associate with
153   * @return the reference instance
154   */
155  public abstract Reference<V> createReference(K key, V obj);
156
157  /**
158   * Get key of the given reference
159   * @param ref The reference
160   * @return key of the reference
161   */
162  public abstract K getReferenceKey(Reference<V> ref);
163
164  /**
165   * Returns a shared object associated with the given {@code key},
166   * which is identified by the {@code equals} method.
167   * @throws NullPointerException if {@code key} is null
168   */
169  public V get(K key) {
170    Reference<V> ref = referenceCache.get(key);
171    if (ref != null) {
172      V obj = ref.get();
173      if (obj != null) {
174        return obj;
175      }
176      referenceCache.remove(key, ref);
177    }
178
179    V newObj = objectFactory.createObject(key);
180    Reference<V> newRef = createReference(key, newObj);
181    while (true) {
182      Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
183      if (existingRef == null) {
184        return newObj;
185      }
186
187      V existingObject = existingRef.get();
188      if (existingObject != null) {
189        return existingObject;
190      }
191      referenceCache.remove(key, existingRef);
192    }
193  }
194
195  /**
196   * Returns an estimated count of objects kept in the pool.
197   * This also counts stale references,
198   * and you might want to call {@link #purge()} beforehand.
199   */
200  public int size() {
201    return referenceCache.size();
202  }
203}