001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import java.lang.ref.Reference;
022import java.lang.ref.ReferenceQueue;
023import java.util.Objects;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.locks.Lock;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
033 * objects may be excessively created and discarded.
034 */
035@InterfaceAudience.Private
036public abstract class ObjectPool<K, V> {
037  /**
038   * An {@code ObjectFactory} object is used to create
039   * new shared objects on demand.
040   */
041  public interface ObjectFactory<K, V> {
042    /**
043     * Creates a new shared object associated with the given {@code key},
044     * identified by the {@code equals} method.
045     * This method may be simultaneously called by multiple threads
046     * with the same key, and the excessive objects are just discarded.
047     */
048    V createObject(K key);
049  }
050
051  protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
052
053  private final ObjectFactory<K, V> objectFactory;
054
055  /** Does not permit null keys. */
056  protected final ConcurrentMap<K, Reference<V>> referenceCache;
057
058  /** For preventing parallel purge */
059  private final Lock purgeLock = new ReentrantLock();
060
061  /**
062   * The default initial capacity,
063   * used when not otherwise specified in a constructor.
064   */
065  public static final int DEFAULT_INITIAL_CAPACITY = 16;
066
067  /**
068   * The default concurrency level,
069   * used when not otherwise specified in a constructor.
070   */
071  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
072
073  /**
074   * Creates a new pool with the default initial capacity (16)
075   * and the default concurrency level (16).
076   *
077   * @param objectFactory the factory to supply new objects on demand
078   *
079   * @throws NullPointerException if {@code objectFactory} is {@code null}
080   */
081  public ObjectPool(ObjectFactory<K, V> objectFactory) {
082    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
083  }
084
085  /**
086   * Creates a new pool with the given initial capacity
087   * and the default concurrency level (16).
088   *
089   * @param objectFactory the factory to supply new objects on demand
090   * @param initialCapacity the initial capacity to keep objects in the pool
091   *
092   * @throws NullPointerException if {@code objectFactory} is {@code null}
093   * @throws IllegalArgumentException if {@code initialCapacity} is negative
094   */
095  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
096    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
097  }
098
099  /**
100   * Creates a new pool with the given initial capacity
101   * and the given concurrency level.
102   *
103   * @param objectFactory the factory to supply new objects on demand
104   * @param initialCapacity the initial capacity to keep objects in the pool
105   * @param concurrencyLevel the estimated count of concurrently accessing threads
106   *
107   * @throws NullPointerException if {@code objectFactory} is {@code null}
108   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
109   *    {@code concurrencyLevel} is non-positive
110   */
111  public ObjectPool(
112      ObjectFactory<K, V> objectFactory,
113      int initialCapacity,
114      int concurrencyLevel) {
115
116    this.objectFactory = Objects.requireNonNull(objectFactory, "Object factory cannot be null");
117
118    this.referenceCache =
119        new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
120  }
121
122  /**
123   * Removes stale references of shared objects from the pool. References newly becoming stale may
124   * still remain.
125   * <p/>
126   * The implementation of this method is expected to be lightweight when there is no stale
127   * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because
128   * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}.
129   */
130  public void purge() {
131    if (purgeLock.tryLock()) {// no parallel purge
132      try {
133        while (true) {
134          @SuppressWarnings("unchecked")
135          Reference<V> ref = (Reference<V>) staleRefQueue.poll();
136          if (ref == null) {
137            break;
138          }
139          referenceCache.remove(getReferenceKey(ref), ref);
140        }
141      } finally {
142        purgeLock.unlock();
143      }
144    }
145  }
146
147  /**
148   * Create a reference associated with the given object
149   * @param key the key to store in the reference
150   * @param obj the object to associate with
151   * @return the reference instance
152   */
153  public abstract Reference<V> createReference(K key, V obj);
154
155  /**
156   * Get key of the given reference
157   * @param ref The reference
158   * @return key of the reference
159   */
160  public abstract K getReferenceKey(Reference<V> ref);
161
162  /**
163   * Returns a shared object associated with the given {@code key},
164   * which is identified by the {@code equals} method.
165   * @throws NullPointerException if {@code key} is {@code null}
166   */
167  public V get(K key) {
168    Reference<V> ref = referenceCache.get(Objects.requireNonNull(key));
169    if (ref != null) {
170      V obj = ref.get();
171      if (obj != null) {
172        return obj;
173      }
174      referenceCache.remove(key, ref);
175    }
176
177    V newObj = objectFactory.createObject(key);
178    Reference<V> newRef = createReference(key, newObj);
179    while (true) {
180      Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
181      if (existingRef == null) {
182        return newObj;
183      }
184
185      V existingObject = existingRef.get();
186      if (existingObject != null) {
187        return existingObject;
188      }
189      referenceCache.remove(key, existingRef);
190    }
191  }
192
193  /**
194   * Returns an estimated count of objects kept in the pool.
195   * This also counts stale references,
196   * and you might want to call {@link #purge()} beforehand.
197   */
198  public int size() {
199    return referenceCache.size();
200  }
201}