View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.util;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.CopyOnWriteArrayList;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  /**
36   *
37   * The <code>PoolMap</code> maps a key to a collection of values, the elements
38   * of which are managed by a pool. In effect, that collection acts as a shared
39   * pool of resources, access to which is closely controlled as per the semantics
40   * of the pool.
41   *
42   * <p>
43   * In case the size of the pool is set to a non-zero positive number, that is
44   * used to cap the number of resources that a pool may contain for any given
45   * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
46   * </p>
47   *
48   * @param <K>
49   *          the type of the key to the resource
50   * @param <V>
51   *          the type of the resource being pooled
52   */
53  public class PoolMap<K, V> implements Map<K, V> {
54    private PoolType poolType;
55  
56    private int poolMaxSize;
57  
58    private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
59  
60    public PoolMap(PoolType poolType) {
61      this.poolType = poolType;
62    }
63  
64    public PoolMap(PoolType poolType, int poolMaxSize) {
65      this.poolType = poolType;
66      this.poolMaxSize = poolMaxSize;
67    }
68  
69    @Override
70    public V get(Object key) {
71      Pool<V> pool = pools.get(key);
72      return pool != null ? pool.get() : null;
73    }
74  
75    @Override
76    public V put(K key, V value) {
77      Pool<V> pool = pools.get(key);
78      if (pool == null) {
79        pools.put(key, pool = createPool());
80      }
81      return pool != null ? pool.put(value) : null;
82    }
83  
84    @SuppressWarnings("unchecked")
85    @Override
86    public V remove(Object key) {
87      Pool<V> pool = pools.remove(key);
88      if (pool != null) {
89        remove((K) key, pool.get());
90      }
91      return null;
92    }
93  
94    public boolean remove(K key, V value) {
95      Pool<V> pool = pools.get(key);
96      boolean res = false;
97      if (pool != null) {
98        res = pool.remove(value);
99        if (res && pool.size() == 0) {
100         pools.remove(key);
101       }
102     }
103     return res;
104   }
105 
106   @Override
107   public Collection<V> values() {
108     Collection<V> values = new ArrayList<V>();
109     for (Pool<V> pool : pools.values()) {
110       Collection<V> poolValues = pool.values();
111       if (poolValues != null) {
112         values.addAll(poolValues);
113       }
114     }
115     return values;
116   }
117 
118   public Collection<V> values(K key) {
119     Collection<V> values = new ArrayList<V>();
120     Pool<V> pool = pools.get(key);
121     if (pool != null) {
122       Collection<V> poolValues = pool.values();
123       if (poolValues != null) {
124         values.addAll(poolValues);
125       }
126     }
127     return values;
128   }
129 
130 
131   @Override
132   public boolean isEmpty() {
133     return pools.isEmpty();
134   }
135 
136   @Override
137   public int size() {
138     return pools.size();
139   }
140 
141   public int size(K key) {
142     Pool<V> pool = pools.get(key);
143     return pool != null ? pool.size() : 0;
144   }
145 
146   @Override
147   public boolean containsKey(Object key) {
148     return pools.containsKey(key);
149   }
150 
151   @Override
152   public boolean containsValue(Object value) {
153     if (value == null) {
154       return false;
155     }
156     for (Pool<V> pool : pools.values()) {
157       if (value.equals(pool.get())) {
158         return true;
159       }
160     }
161     return false;
162   }
163 
164   @Override
165   public void putAll(Map<? extends K, ? extends V> map) {
166     for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
167       put(entry.getKey(), entry.getValue());
168     }
169   }
170 
171   @Override
172   public void clear() {
173     for (Pool<V> pool : pools.values()) {
174       pool.clear();
175     }
176     pools.clear();
177   }
178 
179   @Override
180   public Set<K> keySet() {
181     return pools.keySet();
182   }
183 
184   @Override
185   public Set<Map.Entry<K, V>> entrySet() {
186     Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
187     for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
188       final K poolKey = poolEntry.getKey();
189       final Pool<V> pool = poolEntry.getValue();
190       for (final V poolValue : pool.values()) {
191         if (pool != null) {
192           entries.add(new Map.Entry<K, V>() {
193             @Override
194             public K getKey() {
195               return poolKey;
196             }
197 
198             @Override
199             public V getValue() {
200               return poolValue;
201             }
202 
203             @Override
204             public V setValue(V value) {
205               return pool.put(value);
206             }
207           });
208         }
209       }
210     }
211     return null;
212   }
213 
214   protected interface Pool<R> {
215     public R get();
216 
217     public R put(R resource);
218 
219     public boolean remove(R resource);
220 
221     public void clear();
222 
223     public Collection<R> values();
224 
225     public int size();
226   }
227 
228   public enum PoolType {
229     Reusable, ThreadLocal, RoundRobin;
230 
231     public static PoolType valueOf(String poolTypeName,
232         PoolType defaultPoolType, PoolType... allowedPoolTypes) {
233       PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
234       if (poolType != null) {
235         boolean allowedType = false;
236         if (poolType.equals(defaultPoolType)) {
237           allowedType = true;
238         } else {
239           if (allowedPoolTypes != null) {
240             for (PoolType allowedPoolType : allowedPoolTypes) {
241               if (poolType.equals(allowedPoolType)) {
242                 allowedType = true;
243                 break;
244               }
245             }
246           }
247         }
248         if (!allowedType) {
249           poolType = null;
250         }
251       }
252       return (poolType != null) ? poolType : defaultPoolType;
253     }
254 
255     public static String fuzzyNormalize(String name) {
256       return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
257     }
258 
259     public static PoolType fuzzyMatch(String name) {
260       for (PoolType poolType : values()) {
261         if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
262           return poolType;
263         }
264       }
265       return null;
266     }
267   }
268 
269   protected Pool<V> createPool() {
270     switch (poolType) {
271     case Reusable:
272       return new ReusablePool<V>(poolMaxSize);
273     case RoundRobin:
274       return new RoundRobinPool<V>(poolMaxSize);
275     case ThreadLocal:
276       return new ThreadLocalPool<V>();
277     }
278     return null;
279   }
280 
281   /**
282    * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
283    * on the {@link LinkedList} class. It essentially allows resources to be
284    * checked out, at which point it is removed from this pool. When the resource
285    * is no longer required, it should be returned to the pool in order to be
286    * reused.
287    *
288    * <p>
289    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
290    * the pool is unbounded. Otherwise, it caps the number of consumers that can
291    * check out a resource from this pool to the (non-zero positive) value
292    * specified in {@link #maxSize}.
293    * </p>
294    *
295    * @param <R>
296    *          the type of the resource
297    */
298   @SuppressWarnings("serial")
299   public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
300     private int maxSize;
301 
302     public ReusablePool(int maxSize) {
303       this.maxSize = maxSize;
304 
305     }
306 
307     @Override
308     public R get() {
309       return poll();
310     }
311 
312     @Override
313     public R put(R resource) {
314       if (size() < maxSize) {
315         add(resource);
316       }
317       return null;
318     }
319 
320     @Override
321     public Collection<R> values() {
322       return this;
323     }
324   }
325 
326   /**
327    * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
328    * stores its resources in an {@link ArrayList}. It load-balances access to
329    * its resources by returning a different resource every time a given key is
330    * looked up.
331    *
332    * <p>
333    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
334    * the pool is unbounded. Otherwise, it caps the number of resources in this
335    * pool to the (non-zero positive) value specified in {@link #maxSize}.
336    * </p>
337    *
338    * @param <R>
339    *          the type of the resource
340    *
341    */
342   @SuppressWarnings("serial")
343   class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
344     private int maxSize;
345     private int nextResource = 0;
346 
347     public RoundRobinPool(int maxSize) {
348       this.maxSize = maxSize;
349     }
350 
351     @Override
352     public R put(R resource) {
353       if (size() < maxSize) {
354         add(resource);
355       }
356       return null;
357     }
358 
359     @Override
360     public R get() {
361       if (size() < maxSize) {
362         return null;
363       }
364       nextResource %= size();
365       R resource = get(nextResource++);
366       return resource;
367     }
368 
369     @Override
370     public Collection<R> values() {
371       return this;
372     }
373 
374   }
375 
376   /**
377    * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
378    * builds on the {@link ThreadLocal} class. It essentially binds the resource
379    * to the thread from which it is accessed.
380    *
381    * <p>
382    * Note that the size of the pool is essentially bounded by the number of threads
383    * that add resources to this pool.
384    * </p>
385    *
386    * @param <R>
387    *          the type of the resource
388    */
389   static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
390     private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
391 
392     public ThreadLocalPool() {
393     }
394 
395     @Override
396     public R put(R resource) {
397       R previousResource = get();
398       if (previousResource == null) {
399         AtomicInteger poolSize = poolSizes.get(this);
400         if (poolSize == null) {
401           poolSizes.put(this, poolSize = new AtomicInteger(0));
402         }
403         poolSize.incrementAndGet();
404       }
405       this.set(resource);
406       return previousResource;
407     }
408 
409     @Override
410     public void remove() {
411       super.remove();
412       AtomicInteger poolSize = poolSizes.get(this);
413       if (poolSize != null) {
414         poolSize.decrementAndGet();
415       }
416     }
417 
418     @Override
419     public int size() {
420       AtomicInteger poolSize = poolSizes.get(this);
421       return poolSize != null ? poolSize.get() : 0;
422     }
423 
424     @Override
425     public boolean remove(R resource) {
426       R previousResource = super.get();
427       if (resource != null && resource.equals(previousResource)) {
428         remove();
429         return true;
430       } else {
431         return false;
432       }
433     }
434 
435     @Override
436     public void clear() {
437       super.remove();
438     }
439 
440     @Override
441     public Collection<R> values() {
442       List<R> values = new ArrayList<R>();
443       values.add(get());
444       return values;
445     }
446   }
447 }