001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.lang.ref.Reference; 021import java.lang.ref.ReferenceQueue; 022import java.util.Objects; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantLock; 027import org.apache.yetus.audience.InterfaceAudience; 028 029/** 030 * A thread-safe shared object pool in which object creation is expected to be lightweight, and the 031 * objects may be excessively created and discarded. 032 */ 033@InterfaceAudience.Private 034public abstract class ObjectPool<K, V> { 035 /** 036 * An {@code ObjectFactory} object is used to create new shared objects on demand. 037 */ 038 public interface ObjectFactory<K, V> { 039 /** 040 * Creates a new shared object associated with the given {@code key}, identified by the 041 * {@code equals} method. This method may be simultaneously called by multiple threads with the 042 * same key, and the excessive objects are just discarded. 043 */ 044 V createObject(K key); 045 } 046 047 protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>(); 048 049 private final ObjectFactory<K, V> objectFactory; 050 051 /** Does not permit null keys. */ 052 protected final ConcurrentMap<K, Reference<V>> referenceCache; 053 054 /** For preventing parallel purge */ 055 private final Lock purgeLock = new ReentrantLock(); 056 057 /** 058 * The default initial capacity, used when not otherwise specified in a constructor. 059 */ 060 public static final int DEFAULT_INITIAL_CAPACITY = 16; 061 062 /** 063 * The default concurrency level, used when not otherwise specified in a constructor. 064 */ 065 public static final int DEFAULT_CONCURRENCY_LEVEL = 16; 066 067 /** 068 * Creates a new pool with the default initial capacity (16) and the default concurrency level 069 * (16). 070 * @param objectFactory the factory to supply new objects on demand 071 * @throws NullPointerException if {@code objectFactory} is {@code null} 072 */ 073 public ObjectPool(ObjectFactory<K, V> objectFactory) { 074 this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); 075 } 076 077 /** 078 * Creates a new pool with the given initial capacity and the default concurrency level (16). 079 * @param objectFactory the factory to supply new objects on demand 080 * @param initialCapacity the initial capacity to keep objects in the pool 081 * @throws NullPointerException if {@code objectFactory} is {@code null} 082 * @throws IllegalArgumentException if {@code initialCapacity} is negative 083 */ 084 public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { 085 this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); 086 } 087 088 /** 089 * Creates a new pool with the given initial capacity and the given concurrency level. 090 * @param objectFactory the factory to supply new objects on demand 091 * @param initialCapacity the initial capacity to keep objects in the pool 092 * @param concurrencyLevel the estimated count of concurrently accessing threads 093 * @throws NullPointerException if {@code objectFactory} is {@code null} 094 * @throws IllegalArgumentException if {@code initialCapacity} is negative or 095 * {@code concurrencyLevel} is non-positive 096 */ 097 public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, int concurrencyLevel) { 098 099 this.objectFactory = Objects.requireNonNull(objectFactory, "Object factory cannot be null"); 100 101 this.referenceCache = 102 new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel); 103 } 104 105 /** 106 * Removes stale references of shared objects from the pool. References newly becoming stale may 107 * still remain. 108 * <p/> 109 * The implementation of this method is expected to be lightweight when there is no stale 110 * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because 111 * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}. 112 */ 113 public void purge() { 114 if (purgeLock.tryLock()) {// no parallel purge 115 try { 116 while (true) { 117 @SuppressWarnings("unchecked") 118 Reference<V> ref = (Reference<V>) staleRefQueue.poll(); 119 if (ref == null) { 120 break; 121 } 122 referenceCache.remove(getReferenceKey(ref), ref); 123 } 124 } finally { 125 purgeLock.unlock(); 126 } 127 } 128 } 129 130 /** 131 * Create a reference associated with the given object 132 * @param key the key to store in the reference 133 * @param obj the object to associate with 134 * @return the reference instance 135 */ 136 public abstract Reference<V> createReference(K key, V obj); 137 138 /** 139 * Get key of the given reference 140 * @param ref The reference 141 * @return key of the reference 142 */ 143 public abstract K getReferenceKey(Reference<V> ref); 144 145 /** 146 * Returns a shared object associated with the given {@code key}, which is identified by the 147 * {@code equals} method. 148 * @throws NullPointerException if {@code key} is {@code null} 149 */ 150 public V get(K key) { 151 Reference<V> ref = referenceCache.get(Objects.requireNonNull(key)); 152 if (ref != null) { 153 V obj = ref.get(); 154 if (obj != null) { 155 return obj; 156 } 157 referenceCache.remove(key, ref); 158 } 159 160 V newObj = objectFactory.createObject(key); 161 Reference<V> newRef = createReference(key, newObj); 162 while (true) { 163 Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef); 164 if (existingRef == null) { 165 return newObj; 166 } 167 168 V existingObject = existingRef.get(); 169 if (existingObject != null) { 170 return existingObject; 171 } 172 referenceCache.remove(key, existingRef); 173 } 174 } 175 176 /** 177 * Returns an estimated count of objects kept in the pool. This also counts stale references, and 178 * you might want to call {@link #purge()} beforehand. 179 */ 180 public int size() { 181 return referenceCache.size(); 182 } 183}