1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.lang.ref.ReferenceQueue;
22 import java.lang.ref.WeakReference;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28 /**
29 * A {@code WeakReference} based shared object pool.
30 * The objects are kept in weak references and
31 * associated with keys which are identified by the {@code equals} method.
32 * The objects are created by {@link ObjectFactory} on demand.
33 * The object creation is expected to be lightweight,
34 * and the objects may be excessively created and discarded.
35 * Thread safe.
36 */
37 @InterfaceAudience.Private
38 public class WeakObjectPool<K, V> {
39 /**
40 * An {@code ObjectFactory} object is used to create
41 * new shared objects on demand.
42 */
43 public interface ObjectFactory<K, V> {
44 /**
45 * Creates a new shared object associated with the given {@code key},
46 * identified by the {@code equals} method.
47 * This method may be simultaneously called by multiple threads
48 * with the same key, and the excessive objects are just discarded.
49 */
50 V createObject(K key);
51 }
52
53 private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<V>();
54
55 private class ObjectReference extends WeakReference<V> {
56 final K key;
57
58 ObjectReference(K key, V obj) {
59 super(obj, staleRefQueue);
60 this.key = key;
61 }
62 }
63
64 private final ObjectFactory<K, V> objectFactory;
65
66 /** Does not permit null keys. */
67 private final ConcurrentMap<K, ObjectReference> referenceCache;
68
69 /**
70 * The default initial capacity,
71 * used when not otherwise specified in a constructor.
72 */
73 public static final int DEFAULT_INITIAL_CAPACITY = 16;
74
75 /**
76 * The default concurrency level,
77 * used when not otherwise specified in a constructor.
78 */
79 public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
80
81 /**
82 * Creates a new pool with the default initial capacity (16)
83 * and the default concurrency level (16).
84 *
85 * @param objectFactory the factory to supply new objects on demand
86 *
87 * @throws NullPointerException if {@code objectFactory} is null
88 */
89 public WeakObjectPool(ObjectFactory<K, V> objectFactory) {
90 this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
91 }
92
93 /**
94 * Creates a new pool with the given initial capacity
95 * and the default concurrency level (16).
96 *
97 * @param objectFactory the factory to supply new objects on demand
98 * @param initialCapacity the initial capacity to keep objects in the pool
99 *
100 * @throws NullPointerException if {@code objectFactory} is null
101 * @throws IllegalArgumentException if {@code initialCapacity} is negative
102 */
103 public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
104 this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
105 }
106
107 /**
108 * Creates a new pool with the given initial capacity
109 * and the given concurrency level.
110 *
111 * @param objectFactory the factory to supply new objects on demand
112 * @param initialCapacity the initial capacity to keep objects in the pool
113 * @param concurrencyLevel the estimated count of concurrently accessing threads
114 *
115 * @throws NullPointerException if {@code objectFactory} is null
116 * @throws IllegalArgumentException if {@code initialCapacity} is negative or
117 * {@code concurrencyLevel} is non-positive
118 */
119 public WeakObjectPool(
120 ObjectFactory<K, V> objectFactory,
121 int initialCapacity,
122 int concurrencyLevel) {
123
124 if (objectFactory == null) {
125 throw new NullPointerException();
126 }
127 this.objectFactory = objectFactory;
128
129 this.referenceCache = new ConcurrentHashMap<K, ObjectReference>(
130 initialCapacity, 0.75f, concurrencyLevel);
131 // 0.75f is the default load factor threshold of ConcurrentHashMap.
132 }
133
134 /**
135 * Removes stale references of shared objects from the pool.
136 * References newly becoming stale may still remain.
137 * The implementation of this method is expected to be lightweight
138 * when there is no stale reference.
139 */
140 public void purge() {
141 // This method is lightweight while there is no stale reference
142 // with the Oracle (Sun) implementation of {@code ReferenceQueue},
143 // because {@code ReferenceQueue.poll} just checks a volatile instance
144 // variable in {@code ReferenceQueue}.
145
146 while (true) {
147 @SuppressWarnings("unchecked")
148 ObjectReference ref = (ObjectReference)staleRefQueue.poll();
149 if (ref == null) {
150 break;
151 }
152 referenceCache.remove(ref.key, ref);
153 }
154 }
155
156 /**
157 * Returns a shared object associated with the given {@code key},
158 * which is identified by the {@code equals} method.
159 * @throws NullPointerException if {@code key} is null
160 */
161 public V get(K key) {
162 ObjectReference ref = referenceCache.get(key);
163 if (ref != null) {
164 V obj = ref.get();
165 if (obj != null) {
166 return obj;
167 }
168 referenceCache.remove(key, ref);
169 }
170
171 V newObj = objectFactory.createObject(key);
172 ObjectReference newRef = new ObjectReference(key, newObj);
173 while (true) {
174 ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef);
175 if (existingRef == null) {
176 return newObj;
177 }
178
179 V existingObject = existingRef.get();
180 if (existingObject != null) {
181 return existingObject;
182 }
183 referenceCache.remove(key, existingRef);
184 }
185 }
186
187 /**
188 * Returns an estimated count of objects kept in the pool.
189 * This also counts stale references,
190 * and you might want to call {@link #purge()} beforehand.
191 */
192 public int size() {
193 return referenceCache.size();
194 }
195 }