View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.lang.ref.ReferenceQueue;
22  import java.lang.ref.WeakReference;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  
28  /**
29   * A {@code WeakReference} based shared object pool.
30   * The objects are kept in weak references and
31   * associated with keys which are identified by the {@code equals} method.
32   * The objects are created by {@link ObjectFactory} on demand.
33   * The object creation is expected to be lightweight,
34   * and the objects may be excessively created and discarded.
35   * Thread safe.
36   */
37  @InterfaceAudience.Private
38  public class WeakObjectPool<K, V> {
39    /**
40     * An {@code ObjectFactory} object is used to create
41     * new shared objects on demand.
42     */
43    public interface ObjectFactory<K, V> {
44      /**
45       * Creates a new shared object associated with the given {@code key},
46       * identified by the {@code equals} method.
47       * This method may be simultaneously called by multiple threads
48       * with the same key, and the excessive objects are just discarded.
49       */
50      V createObject(K key);
51    }
52  
53    private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<V>();
54  
55    private class ObjectReference extends WeakReference<V> {
56      final K key;
57  
58      ObjectReference(K key, V obj) {
59        super(obj, staleRefQueue);
60        this.key = key;
61      }
62    }
63  
64    private final ObjectFactory<K, V> objectFactory;
65  
66    /** Does not permit null keys. */
67    private final ConcurrentMap<K, ObjectReference> referenceCache;
68  
69    /**
70     * The default initial capacity,
71     * used when not otherwise specified in a constructor.
72     */
73    public static final int DEFAULT_INITIAL_CAPACITY = 16;
74  
75    /**
76     * The default concurrency level,
77     * used when not otherwise specified in a constructor.
78     */
79    public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
80  
81    /**
82     * Creates a new pool with the default initial capacity (16)
83     * and the default concurrency level (16).
84     *
85     * @param objectFactory the factory to supply new objects on demand
86     *
87     * @throws NullPointerException if {@code objectFactory} is null
88     */
89    public WeakObjectPool(ObjectFactory<K, V> objectFactory) {
90      this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
91    }
92  
93    /**
94     * Creates a new pool with the given initial capacity
95     * and the default concurrency level (16).
96     *
97     * @param objectFactory the factory to supply new objects on demand
98     * @param initialCapacity the initial capacity to keep objects in the pool
99     *
100    * @throws NullPointerException if {@code objectFactory} is null
101    * @throws IllegalArgumentException if {@code initialCapacity} is negative
102    */
103   public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
104     this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
105   }
106 
107   /**
108    * Creates a new pool with the given initial capacity
109    * and the given concurrency level.
110    *
111    * @param objectFactory the factory to supply new objects on demand
112    * @param initialCapacity the initial capacity to keep objects in the pool
113    * @param concurrencyLevel the estimated count of concurrently accessing threads
114    *
115    * @throws NullPointerException if {@code objectFactory} is null
116    * @throws IllegalArgumentException if {@code initialCapacity} is negative or
117    *    {@code concurrencyLevel} is non-positive
118    */
119   public WeakObjectPool(
120       ObjectFactory<K, V> objectFactory,
121       int initialCapacity,
122       int concurrencyLevel) {
123 
124     if (objectFactory == null) {
125       throw new NullPointerException();
126     }
127     this.objectFactory = objectFactory;
128 
129     this.referenceCache = new ConcurrentHashMap<K, ObjectReference>(
130         initialCapacity, 0.75f, concurrencyLevel);
131     // 0.75f is the default load factor threshold of ConcurrentHashMap.
132   }
133 
134   /**
135    * Removes stale references of shared objects from the pool.
136    * References newly becoming stale may still remain.
137    * The implementation of this method is expected to be lightweight
138    * when there is no stale reference.
139    */
140   public void purge() {
141     // This method is lightweight while there is no stale reference
142     // with the Oracle (Sun) implementation of {@code ReferenceQueue},
143     // because {@code ReferenceQueue.poll} just checks a volatile instance
144     // variable in {@code ReferenceQueue}.
145 
146     while (true) {
147       @SuppressWarnings("unchecked")
148       ObjectReference ref = (ObjectReference)staleRefQueue.poll();
149       if (ref == null) {
150         break;
151       }
152       referenceCache.remove(ref.key, ref);
153     }
154   }
155 
156   /**
157    * Returns a shared object associated with the given {@code key},
158    * which is identified by the {@code equals} method.
159    * @throws NullPointerException if {@code key} is null
160    */
161   public V get(K key) {
162     ObjectReference ref = referenceCache.get(key);
163     if (ref != null) {
164       V obj = ref.get();
165       if (obj != null) {
166         return obj;
167       }
168       referenceCache.remove(key, ref);
169     }
170 
171     V newObj = objectFactory.createObject(key);
172     ObjectReference newRef = new ObjectReference(key, newObj);
173     while (true) {
174       ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef);
175       if (existingRef == null) {
176         return newObj;
177       }
178 
179       V existingObject = existingRef.get();
180       if (existingObject != null) {
181         return existingObject;
182       }
183       referenceCache.remove(key, existingRef);
184     }
185   }
186 
187   /**
188    * Returns an estimated count of objects kept in the pool.
189    * This also counts stale references,
190    * and you might want to call {@link #purge()} beforehand.
191    */
192   public int size() {
193     return referenceCache.size();
194   }
195 }