View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArrayList;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  
36  /**
37   *
38   * The <code>PoolMap</code> maps a key to a collection of values, the elements
39   * of which are managed by a pool. In effect, that collection acts as a shared
40   * pool of resources, access to which is closely controlled as per the semantics
41   * of the pool.
42   *
43   * <p>
44   * In case the size of the pool is set to a non-zero positive number, that is
45   * used to cap the number of resources that a pool may contain for any given
46   * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
47   * </p>
48   *
49   * @param <K>
50   *          the type of the key to the resource
51   * @param <V>
52   *          the type of the resource being pooled
53   */
54  @InterfaceAudience.Private
55  public class PoolMap<K, V> implements Map<K, V> {
56    private PoolType poolType;
57  
58    private int poolMaxSize;
59  
60    private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
61  
62    public PoolMap(PoolType poolType) {
63      this.poolType = poolType;
64    }
65  
66    public PoolMap(PoolType poolType, int poolMaxSize) {
67      this.poolType = poolType;
68      this.poolMaxSize = poolMaxSize;
69    }
70  
71    @Override
72    public V get(Object key) {
73      Pool<V> pool = pools.get(key);
74      return pool != null ? pool.get() : null;
75    }
76  
77    @Override
78    public V put(K key, V value) {
79      Pool<V> pool = pools.get(key);
80      if (pool == null) {
81        pools.put(key, pool = createPool());
82      }
83      return pool != null ? pool.put(value) : null;
84    }
85  
86    @SuppressWarnings("unchecked")
87    @Override
88    public V remove(Object key) {
89      Pool<V> pool = pools.remove(key);
90      if (pool != null) {
91        removeValue((K) key, pool.get());
92      }
93      return null;
94    }
95  
96    public boolean removeValue(K key, V value) {
97      Pool<V> pool = pools.get(key);
98      boolean res = false;
99      if (pool != null) {
100       res = pool.remove(value);
101       if (res && pool.size() == 0) {
102         pools.remove(key);
103       }
104     }
105     return res;
106   }
107 
108   @Override
109   public Collection<V> values() {
110     Collection<V> values = new ArrayList<V>();
111     for (Pool<V> pool : pools.values()) {
112       Collection<V> poolValues = pool.values();
113       if (poolValues != null) {
114         values.addAll(poolValues);
115       }
116     }
117     return values;
118   }
119 
120   public Collection<V> values(K key) {
121     Collection<V> values = new ArrayList<V>();
122     Pool<V> pool = pools.get(key);
123     if (pool != null) {
124       Collection<V> poolValues = pool.values();
125       if (poolValues != null) {
126         values.addAll(poolValues);
127       }
128     }
129     return values;
130   }
131 
132 
133   @Override
134   public boolean isEmpty() {
135     return pools.isEmpty();
136   }
137 
138   @Override
139   public int size() {
140     return pools.size();
141   }
142 
143   public int size(K key) {
144     Pool<V> pool = pools.get(key);
145     return pool != null ? pool.size() : 0;
146   }
147 
148   @Override
149   public boolean containsKey(Object key) {
150     return pools.containsKey(key);
151   }
152 
153   @Override
154   public boolean containsValue(Object value) {
155     if (value == null) {
156       return false;
157     }
158     for (Pool<V> pool : pools.values()) {
159       if (value.equals(pool.get())) {
160         return true;
161       }
162     }
163     return false;
164   }
165 
166   @Override
167   public void putAll(Map<? extends K, ? extends V> map) {
168     for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
169       put(entry.getKey(), entry.getValue());
170     }
171   }
172 
173   @Override
174   public void clear() {
175     for (Pool<V> pool : pools.values()) {
176       pool.clear();
177     }
178     pools.clear();
179   }
180 
181   @Override
182   public Set<K> keySet() {
183     return pools.keySet();
184   }
185 
186   @Override
187   public Set<Map.Entry<K, V>> entrySet() {
188     Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
189     for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
190       final K poolKey = poolEntry.getKey();
191       final Pool<V> pool = poolEntry.getValue();
192       if (pool != null) {
193         for (final V poolValue : pool.values()) {
194           entries.add(new Map.Entry<K, V>() {
195             @Override
196             public K getKey() {
197               return poolKey;
198             }
199 
200             @Override
201             public V getValue() {
202               return poolValue;
203             }
204 
205             @Override
206             public V setValue(V value) {
207               return pool.put(value);
208             }
209           });
210         }
211       }
212     }
213     return null;
214   }
215 
216   protected interface Pool<R> {
217     R get();
218 
219     R put(R resource);
220 
221     boolean remove(R resource);
222 
223     void clear();
224 
225     Collection<R> values();
226 
227     int size();
228   }
229 
230   public enum PoolType {
231     Reusable, ThreadLocal, RoundRobin;
232 
233     public static PoolType valueOf(String poolTypeName,
234         PoolType defaultPoolType, PoolType... allowedPoolTypes) {
235       PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
236       if (poolType != null) {
237         boolean allowedType = false;
238         if (poolType.equals(defaultPoolType)) {
239           allowedType = true;
240         } else {
241           if (allowedPoolTypes != null) {
242             for (PoolType allowedPoolType : allowedPoolTypes) {
243               if (poolType.equals(allowedPoolType)) {
244                 allowedType = true;
245                 break;
246               }
247             }
248           }
249         }
250         if (!allowedType) {
251           poolType = null;
252         }
253       }
254       return (poolType != null) ? poolType : defaultPoolType;
255     }
256 
257     public static String fuzzyNormalize(String name) {
258       return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
259     }
260 
261     public static PoolType fuzzyMatch(String name) {
262       for (PoolType poolType : values()) {
263         if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
264           return poolType;
265         }
266       }
267       return null;
268     }
269   }
270 
271   protected Pool<V> createPool() {
272     switch (poolType) {
273     case Reusable:
274       return new ReusablePool<V>(poolMaxSize);
275     case RoundRobin:
276       return new RoundRobinPool<V>(poolMaxSize);
277     case ThreadLocal:
278       return new ThreadLocalPool<V>();
279     }
280     return null;
281   }
282 
283   /**
284    * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
285    * on the {@link LinkedList} class. It essentially allows resources to be
286    * checked out, at which point it is removed from this pool. When the resource
287    * is no longer required, it should be returned to the pool in order to be
288    * reused.
289    *
290    * <p>
291    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
292    * the pool is unbounded. Otherwise, it caps the number of consumers that can
293    * check out a resource from this pool to the (non-zero positive) value
294    * specified in {@link #maxSize}.
295    * </p>
296    *
297    * @param <R>
298    *          the type of the resource
299    */
300   @SuppressWarnings("serial")
301   public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
302     private int maxSize;
303 
304     public ReusablePool(int maxSize) {
305       this.maxSize = maxSize;
306 
307     }
308 
309     @Override
310     public R get() {
311       return poll();
312     }
313 
314     @Override
315     public R put(R resource) {
316       if (super.size() < maxSize) {
317         add(resource);
318       }
319       return null;
320     }
321 
322     @Override
323     public Collection<R> values() {
324       return this;
325     }
326   }
327 
328   /**
329    * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
330    * stores its resources in an {@link ArrayList}. It load-balances access to
331    * its resources by returning a different resource every time a given key is
332    * looked up.
333    *
334    * <p>
335    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
336    * the pool is unbounded. Otherwise, it caps the number of resources in this
337    * pool to the (non-zero positive) value specified in {@link #maxSize}.
338    * </p>
339    *
340    * @param <R>
341    *          the type of the resource
342    *
343    */
344   @SuppressWarnings("serial")
345   class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
346     private int maxSize;
347     private int nextResource = 0;
348 
349     public RoundRobinPool(int maxSize) {
350       this.maxSize = maxSize;
351     }
352 
353     @Override
354     public R put(R resource) {
355       if (super.size() < maxSize) {
356         add(resource);
357       }
358       return null;
359     }
360 
361     @Override
362     public R get() {
363       if (super.size() < maxSize) {
364         return null;
365       }
366       nextResource %= super.size();
367       R resource = get(nextResource++);
368       return resource;
369     }
370 
371     @Override
372     public Collection<R> values() {
373       return this;
374     }
375 
376   }
377 
378   /**
379    * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
380    * builds on the {@link ThreadLocal} class. It essentially binds the resource
381    * to the thread from which it is accessed.
382    *
383    * <p>
384    * Note that the size of the pool is essentially bounded by the number of threads
385    * that add resources to this pool.
386    * </p>
387    *
388    * @param <R>
389    *          the type of the resource
390    */
391   static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
392     private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
393 
394     public ThreadLocalPool() {
395     }
396 
397     @Override
398     public R put(R resource) {
399       R previousResource = get();
400       if (previousResource == null) {
401         AtomicInteger poolSize = poolSizes.get(this);
402         if (poolSize == null) {
403           poolSizes.put(this, poolSize = new AtomicInteger(0));
404         }
405         poolSize.incrementAndGet();
406       }
407       this.set(resource);
408       return previousResource;
409     }
410 
411     @Override
412     public void remove() {
413       super.remove();
414       AtomicInteger poolSize = poolSizes.get(this);
415       if (poolSize != null) {
416         poolSize.decrementAndGet();
417       }
418     }
419 
420     @Override
421     public int size() {
422       AtomicInteger poolSize = poolSizes.get(this);
423       return poolSize != null ? poolSize.get() : 0;
424     }
425 
426     @Override
427     public boolean remove(R resource) {
428       R previousResource = super.get();
429       if (resource != null && resource.equals(previousResource)) {
430         remove();
431         return true;
432       } else {
433         return false;
434       }
435     }
436 
437     @Override
438     public void clear() {
439       super.remove();
440     }
441 
442     @Override
443     public Collection<R> values() {
444       List<R> values = new ArrayList<R>();
445       values.add(get());
446       return values;
447     }
448   }
449 }