001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.lang.ref.Reference;
021import java.lang.ref.ReferenceQueue;
022import java.util.Objects;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantLock;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
031 * objects may be excessively created and discarded.
032 */
033@InterfaceAudience.Private
034public abstract class ObjectPool<K, V> {
035  /**
036   * An {@code ObjectFactory} object is used to create new shared objects on demand.
037   */
038  public interface ObjectFactory<K, V> {
039    /**
040     * Creates a new shared object associated with the given {@code key}, identified by the
041     * {@code equals} method. This method may be simultaneously called by multiple threads with the
042     * same key, and the excessive objects are just discarded.
043     */
044    V createObject(K key);
045  }
046
047  protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
048
049  private final ObjectFactory<K, V> objectFactory;
050
051  /** Does not permit null keys. */
052  protected final ConcurrentMap<K, Reference<V>> referenceCache;
053
054  /** For preventing parallel purge */
055  private final Lock purgeLock = new ReentrantLock();
056
057  /**
058   * The default initial capacity, used when not otherwise specified in a constructor.
059   */
060  public static final int DEFAULT_INITIAL_CAPACITY = 16;
061
062  /**
063   * The default concurrency level, used when not otherwise specified in a constructor.
064   */
065  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
066
067  /**
068   * Creates a new pool with the default initial capacity (16) and the default concurrency level
069   * (16).
070   * @param objectFactory the factory to supply new objects on demand
071   * @throws NullPointerException if {@code objectFactory} is {@code null}
072   */
073  public ObjectPool(ObjectFactory<K, V> objectFactory) {
074    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
075  }
076
077  /**
078   * Creates a new pool with the given initial capacity and the default concurrency level (16).
079   * @param objectFactory   the factory to supply new objects on demand
080   * @param initialCapacity the initial capacity to keep objects in the pool
081   * @throws NullPointerException     if {@code objectFactory} is {@code null}
082   * @throws IllegalArgumentException if {@code initialCapacity} is negative
083   */
084  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
085    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
086  }
087
088  /**
089   * Creates a new pool with the given initial capacity and the given concurrency level.
090   * @param objectFactory    the factory to supply new objects on demand
091   * @param initialCapacity  the initial capacity to keep objects in the pool
092   * @param concurrencyLevel the estimated count of concurrently accessing threads
093   * @throws NullPointerException     if {@code objectFactory} is {@code null}
094   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
095   *                                  {@code concurrencyLevel} is non-positive
096   */
097  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, int concurrencyLevel) {
098
099    this.objectFactory = Objects.requireNonNull(objectFactory, "Object factory cannot be null");
100
101    this.referenceCache =
102      new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
103  }
104
105  /**
106   * Removes stale references of shared objects from the pool. References newly becoming stale may
107   * still remain.
108   * <p/>
109   * The implementation of this method is expected to be lightweight when there is no stale
110   * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because
111   * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}.
112   */
113  public void purge() {
114    if (purgeLock.tryLock()) {// no parallel purge
115      try {
116        while (true) {
117          @SuppressWarnings("unchecked")
118          Reference<V> ref = (Reference<V>) staleRefQueue.poll();
119          if (ref == null) {
120            break;
121          }
122          referenceCache.remove(getReferenceKey(ref), ref);
123        }
124      } finally {
125        purgeLock.unlock();
126      }
127    }
128  }
129
130  /**
131   * Create a reference associated with the given object
132   * @param key the key to store in the reference
133   * @param obj the object to associate with
134   * @return the reference instance
135   */
136  public abstract Reference<V> createReference(K key, V obj);
137
138  /**
139   * Get key of the given reference
140   * @param ref The reference
141   * @return key of the reference
142   */
143  public abstract K getReferenceKey(Reference<V> ref);
144
145  /**
146   * Returns a shared object associated with the given {@code key}, which is identified by the
147   * {@code equals} method.
148   * @throws NullPointerException if {@code key} is {@code null}
149   */
150  public V get(K key) {
151    Reference<V> ref = referenceCache.get(Objects.requireNonNull(key));
152    if (ref != null) {
153      V obj = ref.get();
154      if (obj != null) {
155        return obj;
156      }
157      referenceCache.remove(key, ref);
158    }
159
160    V newObj = objectFactory.createObject(key);
161    Reference<V> newRef = createReference(key, newObj);
162    while (true) {
163      Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
164      if (existingRef == null) {
165        return newObj;
166      }
167
168      V existingObject = existingRef.get();
169      if (existingObject != null) {
170        return existingObject;
171      }
172      referenceCache.remove(key, existingRef);
173    }
174  }
175
176  /**
177   * Returns an estimated count of objects kept in the pool. This also counts stale references, and
178   * you might want to call {@link #purge()} beforehand.
179   */
180  public int size() {
181    return referenceCache.size();
182  }
183}