001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.lang.ref.Reference; 021import java.lang.ref.ReferenceQueue; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ConcurrentMap; 024import java.util.concurrent.locks.Lock; 025import java.util.concurrent.locks.ReentrantLock; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the 030 * objects may be excessively created and discarded. 031 */ 032@InterfaceAudience.Private 033public abstract class ObjectPool<K, V> { 034 /** 035 * An {@code ObjectFactory} object is used to create new shared objects on demand. 036 */ 037 public interface ObjectFactory<K, V> { 038 /** 039 * Creates a new shared object associated with the given {@code key}, identified by the 040 * {@code equals} method. This method may be simultaneously called by multiple threads with the 041 * same key, and the excessive objects are just discarded. 042 */ 043 V createObject(K key); 044 } 045 046 protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>(); 047 048 private final ObjectFactory<K, V> objectFactory; 049 050 /** Does not permit null keys. */ 051 protected final ConcurrentMap<K, Reference<V>> referenceCache; 052 053 /** For preventing parallel purge */ 054 private final Lock purgeLock = new ReentrantLock(); 055 056 /** 057 * The default initial capacity, used when not otherwise specified in a constructor. 058 */ 059 public static final int DEFAULT_INITIAL_CAPACITY = 16; 060 061 /** 062 * The default concurrency level, used when not otherwise specified in a constructor. 063 */ 064 public static final int DEFAULT_CONCURRENCY_LEVEL = 16; 065 066 /** 067 * Creates a new pool with the default initial capacity (16) and the default concurrency level 068 * (16). 069 * @param objectFactory the factory to supply new objects on demand 070 * @throws NullPointerException if {@code objectFactory} is null 071 */ 072 public ObjectPool(ObjectFactory<K, V> objectFactory) { 073 this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); 074 } 075 076 /** 077 * Creates a new pool with the given initial capacity and the default concurrency level (16). 078 * @param objectFactory the factory to supply new objects on demand 079 * @param initialCapacity the initial capacity to keep objects in the pool 080 * @throws NullPointerException if {@code objectFactory} is null 081 * @throws IllegalArgumentException if {@code initialCapacity} is negative 082 */ 083 public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { 084 this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); 085 } 086 087 /** 088 * Creates a new pool with the given initial capacity and the given concurrency level. 089 * @param objectFactory the factory to supply new objects on demand 090 * @param initialCapacity the initial capacity to keep objects in the pool 091 * @param concurrencyLevel the estimated count of concurrently accessing threads 092 * @throws NullPointerException if {@code objectFactory} is null 093 * @throws IllegalArgumentException if {@code initialCapacity} is negative or 094 * {@code concurrencyLevel} is non-positive 095 */ 096 public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, int concurrencyLevel) { 097 098 if (objectFactory == null) { 099 throw new NullPointerException("Given object factory instance is NULL"); 100 } 101 this.objectFactory = objectFactory; 102 103 this.referenceCache = 104 new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel); 105 } 106 107 /** 108 * Removes stale references of shared objects from the pool. References newly becoming stale may 109 * still remain. 110 * <p/> 111 * The implementation of this method is expected to be lightweight when there is no stale 112 * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because 113 * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}. 114 */ 115 public void purge() { 116 if (purgeLock.tryLock()) {// no parallel purge 117 try { 118 while (true) { 119 @SuppressWarnings("unchecked") 120 Reference<V> ref = (Reference<V>) staleRefQueue.poll(); 121 if (ref == null) { 122 break; 123 } 124 referenceCache.remove(getReferenceKey(ref), ref); 125 } 126 } finally { 127 purgeLock.unlock(); 128 } 129 } 130 } 131 132 /** 133 * Create a reference associated with the given object 134 * @param key the key to store in the reference 135 * @param obj the object to associate with 136 * @return the reference instance 137 */ 138 public abstract Reference<V> createReference(K key, V obj); 139 140 /** 141 * Get key of the given reference 142 * @param ref The reference 143 * @return key of the reference 144 */ 145 public abstract K getReferenceKey(Reference<V> ref); 146 147 /** 148 * Returns a shared object associated with the given {@code key}, which is identified by the 149 * {@code equals} method. 150 * @throws NullPointerException if {@code key} is null 151 */ 152 public V get(K key) { 153 Reference<V> ref = referenceCache.get(key); 154 if (ref != null) { 155 V obj = ref.get(); 156 if (obj != null) { 157 return obj; 158 } 159 referenceCache.remove(key, ref); 160 } 161 162 V newObj = objectFactory.createObject(key); 163 Reference<V> newRef = createReference(key, newObj); 164 while (true) { 165 Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef); 166 if (existingRef == null) { 167 return newObj; 168 } 169 170 V existingObject = existingRef.get(); 171 if (existingObject != null) { 172 return existingObject; 173 } 174 referenceCache.remove(key, existingRef); 175 } 176 } 177 178 /** 179 * Returns an estimated count of objects kept in the pool. This also counts stale references, and 180 * you might want to call {@link #purge()} beforehand. 181 */ 182 public int size() { 183 return referenceCache.size(); 184 } 185}