001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Locale; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * 038 * The <code>PoolMap</code> maps a key to a collection of values, the elements 039 * of which are managed by a pool. In effect, that collection acts as a shared 040 * pool of resources, access to which is closely controlled as per the semantics 041 * of the pool. 042 * 043 * <p> 044 * In case the size of the pool is set to a non-zero positive number, that is 045 * used to cap the number of resources that a pool may contain for any given 046 * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool. 047 * </p> 048 * 049 * @param <K> 050 * the type of the key to the resource 051 * @param <V> 052 * the type of the resource being pooled 053 */ 054@InterfaceAudience.Private 055public class PoolMap<K, V> implements Map<K, V> { 056 private PoolType poolType; 057 058 private int poolMaxSize; 059 060 private Map<K, Pool<V>> pools = new ConcurrentHashMap<>(); 061 062 public PoolMap(PoolType poolType) { 063 this.poolType = poolType; 064 } 065 066 public PoolMap(PoolType poolType, int poolMaxSize) { 067 this.poolType = poolType; 068 this.poolMaxSize = poolMaxSize; 069 } 070 071 @Override 072 public V get(Object key) { 073 Pool<V> pool = pools.get(key); 074 return pool != null ? pool.get() : null; 075 } 076 077 @Override 078 public V put(K key, V value) { 079 Pool<V> pool = pools.get(key); 080 if (pool == null) { 081 pools.put(key, pool = createPool()); 082 } 083 return pool != null ? pool.put(value) : null; 084 } 085 086 @SuppressWarnings("unchecked") 087 @Override 088 public V remove(Object key) { 089 Pool<V> pool = pools.remove(key); 090 if (pool != null) { 091 removeValue((K) key, pool.get()); 092 } 093 return null; 094 } 095 096 public boolean removeValue(K key, V value) { 097 Pool<V> pool = pools.get(key); 098 boolean res = false; 099 if (pool != null) { 100 res = pool.remove(value); 101 if (res && pool.size() == 0) { 102 pools.remove(key); 103 } 104 } 105 return res; 106 } 107 108 @Override 109 public Collection<V> values() { 110 Collection<V> values = new ArrayList<>(); 111 for (Pool<V> pool : pools.values()) { 112 Collection<V> poolValues = pool.values(); 113 if (poolValues != null) { 114 values.addAll(poolValues); 115 } 116 } 117 return values; 118 } 119 120 public Collection<V> values(K key) { 121 Collection<V> values = new ArrayList<>(); 122 Pool<V> pool = pools.get(key); 123 if (pool != null) { 124 Collection<V> poolValues = pool.values(); 125 if (poolValues != null) { 126 values.addAll(poolValues); 127 } 128 } 129 return values; 130 } 131 132 133 @Override 134 public boolean isEmpty() { 135 return pools.isEmpty(); 136 } 137 138 @Override 139 public int size() { 140 return pools.size(); 141 } 142 143 public int size(K key) { 144 Pool<V> pool = pools.get(key); 145 return pool != null ? pool.size() : 0; 146 } 147 148 @Override 149 public boolean containsKey(Object key) { 150 return pools.containsKey(key); 151 } 152 153 @Override 154 public boolean containsValue(Object value) { 155 if (value == null) { 156 return false; 157 } 158 for (Pool<V> pool : pools.values()) { 159 if (value.equals(pool.get())) { 160 return true; 161 } 162 } 163 return false; 164 } 165 166 @Override 167 public void putAll(Map<? extends K, ? extends V> map) { 168 for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { 169 put(entry.getKey(), entry.getValue()); 170 } 171 } 172 173 @Override 174 public void clear() { 175 for (Pool<V> pool : pools.values()) { 176 pool.clear(); 177 } 178 pools.clear(); 179 } 180 181 @Override 182 public Set<K> keySet() { 183 return pools.keySet(); 184 } 185 186 @Override 187 public Set<Map.Entry<K, V>> entrySet() { 188 Set<Map.Entry<K, V>> entries = new HashSet<>(); 189 for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) { 190 final K poolKey = poolEntry.getKey(); 191 final Pool<V> pool = poolEntry.getValue(); 192 if (pool != null) { 193 for (final V poolValue : pool.values()) { 194 entries.add(new Map.Entry<K, V>() { 195 @Override 196 public K getKey() { 197 return poolKey; 198 } 199 200 @Override 201 public V getValue() { 202 return poolValue; 203 } 204 205 @Override 206 public V setValue(V value) { 207 return pool.put(value); 208 } 209 }); 210 } 211 } 212 } 213 return entries; 214 } 215 216 protected interface Pool<R> { 217 R get(); 218 219 R put(R resource); 220 221 boolean remove(R resource); 222 223 void clear(); 224 225 Collection<R> values(); 226 227 int size(); 228 } 229 230 public enum PoolType { 231 Reusable, ThreadLocal, RoundRobin; 232 233 public static PoolType valueOf(String poolTypeName, 234 PoolType defaultPoolType, PoolType... allowedPoolTypes) { 235 PoolType poolType = PoolType.fuzzyMatch(poolTypeName); 236 if (poolType != null) { 237 boolean allowedType = false; 238 if (poolType.equals(defaultPoolType)) { 239 allowedType = true; 240 } else { 241 if (allowedPoolTypes != null) { 242 for (PoolType allowedPoolType : allowedPoolTypes) { 243 if (poolType.equals(allowedPoolType)) { 244 allowedType = true; 245 break; 246 } 247 } 248 } 249 } 250 if (!allowedType) { 251 poolType = null; 252 } 253 } 254 return (poolType != null) ? poolType : defaultPoolType; 255 } 256 257 public static String fuzzyNormalize(String name) { 258 return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : ""; 259 } 260 261 public static PoolType fuzzyMatch(String name) { 262 for (PoolType poolType : values()) { 263 if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) { 264 return poolType; 265 } 266 } 267 return null; 268 } 269 } 270 271 protected Pool<V> createPool() { 272 switch (poolType) { 273 case Reusable: 274 return new ReusablePool<>(poolMaxSize); 275 case RoundRobin: 276 return new RoundRobinPool<>(poolMaxSize); 277 case ThreadLocal: 278 return new ThreadLocalPool<>(); 279 } 280 return null; 281 } 282 283 /** 284 * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds 285 * on the {@link java.util.LinkedList} class. It essentially allows resources to be 286 * checked out, at which point it is removed from this pool. When the resource 287 * is no longer required, it should be returned to the pool in order to be 288 * reused. 289 * 290 * <p> 291 * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of 292 * the pool is unbounded. Otherwise, it caps the number of consumers that can 293 * check out a resource from this pool to the (non-zero positive) value 294 * specified in {@link #maxSize}. 295 * </p> 296 * 297 * @param <R> 298 * the type of the resource 299 */ 300 @SuppressWarnings("serial") 301 public static class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> { 302 private int maxSize; 303 304 public ReusablePool(int maxSize) { 305 this.maxSize = maxSize; 306 307 } 308 309 @Override 310 public R get() { 311 return poll(); 312 } 313 314 @Override 315 public R put(R resource) { 316 if (super.size() < maxSize) { 317 add(resource); 318 } 319 return null; 320 } 321 322 @Override 323 public Collection<R> values() { 324 return this; 325 } 326 } 327 328 /** 329 * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which 330 * stores its resources in an {@link ArrayList}. It load-balances access to 331 * its resources by returning a different resource every time a given key is 332 * looked up. 333 * 334 * <p> 335 * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of 336 * the pool is unbounded. Otherwise, it caps the number of resources in this 337 * pool to the (non-zero positive) value specified in {@link #maxSize}. 338 * </p> 339 * 340 * @param <R> 341 * the type of the resource 342 * 343 */ 344 @SuppressWarnings("serial") 345 static class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> { 346 private int maxSize; 347 private int nextResource = 0; 348 349 public RoundRobinPool(int maxSize) { 350 this.maxSize = maxSize; 351 } 352 353 @Override 354 public R put(R resource) { 355 if (super.size() < maxSize) { 356 add(resource); 357 } 358 return null; 359 } 360 361 @Override 362 public R get() { 363 if (super.size() < maxSize) { 364 return null; 365 } 366 nextResource %= super.size(); 367 R resource = get(nextResource++); 368 return resource; 369 } 370 371 @Override 372 public Collection<R> values() { 373 return this; 374 } 375 376 } 377 378 /** 379 * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that 380 * builds on the {@link ThreadLocal} class. It essentially binds the resource 381 * to the thread from which it is accessed. 382 * 383 * <p> 384 * Note that the size of the pool is essentially bounded by the number of threads 385 * that add resources to this pool. 386 * </p> 387 * 388 * @param <R> 389 * the type of the resource 390 */ 391 static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> { 392 private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<>(); 393 394 public ThreadLocalPool() { 395 } 396 397 @Override 398 public R put(R resource) { 399 R previousResource = get(); 400 if (previousResource == null) { 401 AtomicInteger poolSize = poolSizes.get(this); 402 if (poolSize == null) { 403 poolSizes.put(this, poolSize = new AtomicInteger(0)); 404 } 405 poolSize.incrementAndGet(); 406 } 407 this.set(resource); 408 return previousResource; 409 } 410 411 @Override 412 public void remove() { 413 super.remove(); 414 AtomicInteger poolSize = poolSizes.get(this); 415 if (poolSize != null) { 416 poolSize.decrementAndGet(); 417 } 418 } 419 420 @Override 421 public int size() { 422 AtomicInteger poolSize = poolSizes.get(this); 423 return poolSize != null ? poolSize.get() : 0; 424 } 425 426 @Override 427 public boolean remove(R resource) { 428 R previousResource = super.get(); 429 if (resource != null && resource.equals(previousResource)) { 430 remove(); 431 return true; 432 } else { 433 return false; 434 } 435 } 436 437 @Override 438 public void clear() { 439 super.remove(); 440 } 441 442 @Override 443 public Collection<R> values() { 444 List<R> values = new ArrayList<>(); 445 values.add(get()); 446 return values; 447 } 448 } 449}