001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import java.lang.ref.Reference; 022import java.lang.ref.ReferenceQueue; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantLock; 027 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the 032 * objects may be excessively created and discarded. 033 */ 034@InterfaceAudience.Private 035public abstract class ObjectPool<K, V> { 036 /** 037 * An {@code ObjectFactory} object is used to create 038 * new shared objects on demand. 039 */ 040 public interface ObjectFactory<K, V> { 041 /** 042 * Creates a new shared object associated with the given {@code key}, 043 * identified by the {@code equals} method. 044 * This method may be simultaneously called by multiple threads 045 * with the same key, and the excessive objects are just discarded. 046 */ 047 V createObject(K key); 048 } 049 050 protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>(); 051 052 private final ObjectFactory<K, V> objectFactory; 053 054 /** Does not permit null keys. */ 055 protected final ConcurrentMap<K, Reference<V>> referenceCache; 056 057 /** For preventing parallel purge */ 058 private final Lock purgeLock = new ReentrantLock(); 059 060 /** 061 * The default initial capacity, 062 * used when not otherwise specified in a constructor. 063 */ 064 public static final int DEFAULT_INITIAL_CAPACITY = 16; 065 066 /** 067 * The default concurrency level, 068 * used when not otherwise specified in a constructor. 069 */ 070 public static final int DEFAULT_CONCURRENCY_LEVEL = 16; 071 072 /** 073 * Creates a new pool with the default initial capacity (16) 074 * and the default concurrency level (16). 075 * 076 * @param objectFactory the factory to supply new objects on demand 077 * 078 * @throws NullPointerException if {@code objectFactory} is null 079 */ 080 public ObjectPool(ObjectFactory<K, V> objectFactory) { 081 this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); 082 } 083 084 /** 085 * Creates a new pool with the given initial capacity 086 * and the default concurrency level (16). 087 * 088 * @param objectFactory the factory to supply new objects on demand 089 * @param initialCapacity the initial capacity to keep objects in the pool 090 * 091 * @throws NullPointerException if {@code objectFactory} is null 092 * @throws IllegalArgumentException if {@code initialCapacity} is negative 093 */ 094 public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { 095 this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); 096 } 097 098 /** 099 * Creates a new pool with the given initial capacity 100 * and the given concurrency level. 101 * 102 * @param objectFactory the factory to supply new objects on demand 103 * @param initialCapacity the initial capacity to keep objects in the pool 104 * @param concurrencyLevel the estimated count of concurrently accessing threads 105 * 106 * @throws NullPointerException if {@code objectFactory} is null 107 * @throws IllegalArgumentException if {@code initialCapacity} is negative or 108 * {@code concurrencyLevel} is non-positive 109 */ 110 public ObjectPool( 111 ObjectFactory<K, V> objectFactory, 112 int initialCapacity, 113 int concurrencyLevel) { 114 115 if (objectFactory == null) { 116 throw new NullPointerException("Given object factory instance is NULL"); 117 } 118 this.objectFactory = objectFactory; 119 120 this.referenceCache = 121 new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel); 122 } 123 124 /** 125 * Removes stale references of shared objects from the pool. References newly becoming stale may 126 * still remain. 127 * <p/> 128 * The implementation of this method is expected to be lightweight when there is no stale 129 * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because 130 * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}. 131 */ 132 public void purge() { 133 if (purgeLock.tryLock()) {// no parallel purge 134 try { 135 while (true) { 136 @SuppressWarnings("unchecked") 137 Reference<V> ref = (Reference<V>) staleRefQueue.poll(); 138 if (ref == null) { 139 break; 140 } 141 referenceCache.remove(getReferenceKey(ref), ref); 142 } 143 } finally { 144 purgeLock.unlock(); 145 } 146 } 147 } 148 149 /** 150 * Create a reference associated with the given object 151 * @param key the key to store in the reference 152 * @param obj the object to associate with 153 * @return the reference instance 154 */ 155 public abstract Reference<V> createReference(K key, V obj); 156 157 /** 158 * Get key of the given reference 159 * @param ref The reference 160 * @return key of the reference 161 */ 162 public abstract K getReferenceKey(Reference<V> ref); 163 164 /** 165 * Returns a shared object associated with the given {@code key}, 166 * which is identified by the {@code equals} method. 167 * @throws NullPointerException if {@code key} is null 168 */ 169 public V get(K key) { 170 Reference<V> ref = referenceCache.get(key); 171 if (ref != null) { 172 V obj = ref.get(); 173 if (obj != null) { 174 return obj; 175 } 176 referenceCache.remove(key, ref); 177 } 178 179 V newObj = objectFactory.createObject(key); 180 Reference<V> newRef = createReference(key, newObj); 181 while (true) { 182 Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef); 183 if (existingRef == null) { 184 return newObj; 185 } 186 187 V existingObject = existingRef.get(); 188 if (existingObject != null) { 189 return existingObject; 190 } 191 referenceCache.remove(key, existingRef); 192 } 193 } 194 195 /** 196 * Returns an estimated count of objects kept in the pool. 197 * This also counts stale references, 198 * and you might want to call {@link #purge()} beforehand. 199 */ 200 public int size() { 201 return referenceCache.size(); 202 } 203}