001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Locale; 026import java.util.Map; 027import java.util.Objects; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * The <code>PoolMap</code> maps a key to a collection of values, the elements of which are managed 032 * by a pool. In effect, that collection acts as a shared pool of resources, access to which is 033 * closely controlled as per the semantics of the pool. 034 * <p> 035 * In case the size of the pool is set to a non-zero positive number, that is used to cap the number 036 * of resources that a pool may contain for any given key. A size of {@link Integer#MAX_VALUE} is 037 * interpreted as an unbounded pool. 038 * </p> 039 * <p> 040 * PoolMap is thread-safe. It does not remove elements automatically. Unused resources must be 041 * closed and removed explicitly. 042 * </p> 043 * @param <K> the type of the key to the resource 044 * @param <V> the type of the resource being pooled 045 */ 046@InterfaceAudience.Private 047public class PoolMap<K, V> { 048 private final Map<K, Pool<V>> pools; 049 private final PoolType poolType; 050 private final int poolMaxSize; 051 052 public PoolMap(PoolType poolType, int poolMaxSize) { 053 pools = new HashMap<>(); 054 this.poolType = poolType; 055 this.poolMaxSize = poolMaxSize; 056 } 057 058 public V getOrCreate(K key, PoolResourceSupplier<V> supplier) throws IOException { 059 synchronized (pools) { 060 Pool<V> pool = pools.get(key); 061 062 if (pool == null) { 063 pool = createPool(); 064 pools.put(key, pool); 065 } 066 067 try { 068 return pool.getOrCreate(supplier); 069 } catch (IOException | RuntimeException | Error e) { 070 if (pool.size() == 0) { 071 pools.remove(key); 072 } 073 074 throw e; 075 } 076 } 077 } 078 079 public boolean remove(K key, V value) { 080 synchronized (pools) { 081 Pool<V> pool = pools.get(key); 082 083 if (pool == null) { 084 return false; 085 } 086 087 boolean removed = pool.remove(value); 088 089 if (removed && pool.size() == 0) { 090 pools.remove(key); 091 } 092 093 return removed; 094 } 095 } 096 097 public List<V> values() { 098 List<V> values = new ArrayList<>(); 099 100 synchronized (pools) { 101 for (Pool<V> pool : pools.values()) { 102 Collection<V> poolValues = pool.values(); 103 if (poolValues != null) { 104 values.addAll(poolValues); 105 } 106 } 107 } 108 109 return values; 110 } 111 112 public void clear() { 113 synchronized (pools) { 114 for (Pool<V> pool : pools.values()) { 115 pool.clear(); 116 } 117 118 pools.clear(); 119 } 120 } 121 122 public interface PoolResourceSupplier<R> { 123 R get() throws IOException; 124 } 125 126 protected static <V> V createResource(PoolResourceSupplier<V> supplier) throws IOException { 127 V resource = supplier.get(); 128 return Objects.requireNonNull(resource, "resource cannot be null."); 129 } 130 131 protected interface Pool<R> { 132 R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException; 133 134 boolean remove(R resource); 135 136 void clear(); 137 138 Collection<R> values(); 139 140 int size(); 141 } 142 143 public enum PoolType { 144 ThreadLocal, 145 RoundRobin; 146 147 public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType) { 148 PoolType poolType = PoolType.fuzzyMatch(poolTypeName); 149 return (poolType != null) ? poolType : defaultPoolType; 150 } 151 152 public static String fuzzyNormalize(String name) { 153 return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : ""; 154 } 155 156 public static PoolType fuzzyMatch(String name) { 157 for (PoolType poolType : values()) { 158 if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) { 159 return poolType; 160 } 161 } 162 return null; 163 } 164 } 165 166 protected Pool<V> createPool() { 167 switch (poolType) { 168 case RoundRobin: 169 return new RoundRobinPool<>(poolMaxSize); 170 case ThreadLocal: 171 return new ThreadLocalPool<>(); 172 default: 173 return new RoundRobinPool<>(poolMaxSize); 174 } 175 } 176 177 /** 178 * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which stores its resources 179 * in an {@link ArrayList}. It load-balances access to its resources by returning a different 180 * resource every time a given key is looked up. 181 * <p> 182 * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the pool is 183 * unbounded. Otherwise, it caps the number of resources in this pool to the (non-zero positive) 184 * value specified in {@link #maxSize}. 185 * </p> 186 * @param <R> the type of the resource 187 */ 188 @SuppressWarnings("serial") 189 static class RoundRobinPool<R> implements Pool<R> { 190 private final List<R> resources; 191 private final int maxSize; 192 193 private int nextIndex; 194 195 public RoundRobinPool(int maxSize) { 196 if (maxSize <= 0) { 197 throw new IllegalArgumentException("maxSize must be positive"); 198 } 199 200 resources = new ArrayList<>(maxSize); 201 this.maxSize = maxSize; 202 } 203 204 @Override 205 public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException { 206 int size = resources.size(); 207 R resource; 208 209 /* letting pool to grow */ 210 if (size < maxSize) { 211 resource = createResource(supplier); 212 resources.add(resource); 213 } else { 214 resource = resources.get(nextIndex); 215 216 /* at this point size cannot be 0 */ 217 nextIndex = (nextIndex + 1) % size; 218 } 219 220 return resource; 221 } 222 223 @Override 224 public boolean remove(R resource) { 225 return resources.remove(resource); 226 } 227 228 @Override 229 public void clear() { 230 resources.clear(); 231 } 232 233 @Override 234 public Collection<R> values() { 235 return resources; 236 } 237 238 @Override 239 public int size() { 240 return resources.size(); 241 } 242 } 243 244 /** 245 * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that works similarly to 246 * {@link ThreadLocal} class. It essentially binds the resource to the thread from which it is 247 * accessed. It doesn't remove resources when a thread exits, those resources must be closed 248 * manually. 249 * <p> 250 * Note that the size of the pool is essentially bounded by the number of threads that add 251 * resources to this pool. 252 * </p> 253 * @param <R> the type of the resource 254 */ 255 static class ThreadLocalPool<R> implements Pool<R> { 256 private final Map<Thread, R> resources; 257 258 public ThreadLocalPool() { 259 resources = new HashMap<>(); 260 } 261 262 @Override 263 public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException { 264 Thread myself = Thread.currentThread(); 265 R resource = resources.get(myself); 266 267 if (resource == null) { 268 resource = createResource(supplier); 269 resources.put(myself, resource); 270 } 271 272 return resource; 273 } 274 275 @Override 276 public boolean remove(R resource) { 277 /* remove can be called from any thread */ 278 return resources.values().remove(resource); 279 } 280 281 @Override 282 public int size() { 283 return resources.size(); 284 } 285 286 @Override 287 public void clear() { 288 resources.clear(); 289 } 290 291 @Override 292 public Collection<R> values() { 293 return resources.values(); 294 } 295 } 296}