View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.CopyOnWriteArrayList;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  /**
36   *
37   * The <code>PoolMap</code> maps a key to a collection of values, the elements
38   * of which are managed by a pool. In effect, that collection acts as a shared
39   * pool of resources, access to which is closely controlled as per the semantics
40   * of the pool.
41   *
42   * <p>
43   * In case the size of the pool is set to a non-zero positive number, that is
44   * used to cap the number of resources that a pool may contain for any given
45   * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
46   * </p>
47   *
48   * @param <K>
49   *          the type of the key to the resource
50   * @param <V>
51   *          the type of the resource being pooled
52   */
53  @InterfaceAudience.Private
54  public class PoolMap<K, V> implements Map<K, V> {
55    private PoolType poolType;
56  
57    private int poolMaxSize;
58  
59    private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
60  
61    public PoolMap(PoolType poolType) {
62      this.poolType = poolType;
63    }
64  
65    public PoolMap(PoolType poolType, int poolMaxSize) {
66      this.poolType = poolType;
67      this.poolMaxSize = poolMaxSize;
68    }
69  
70    @Override
71    public V get(Object key) {
72      Pool<V> pool = pools.get(key);
73      return pool != null ? pool.get() : null;
74    }
75  
76    @Override
77    public V put(K key, V value) {
78      Pool<V> pool = pools.get(key);
79      if (pool == null) {
80        pools.put(key, pool = createPool());
81      }
82      return pool != null ? pool.put(value) : null;
83    }
84  
85    @SuppressWarnings("unchecked")
86    @Override
87    public V remove(Object key) {
88      Pool<V> pool = pools.remove(key);
89      if (pool != null) {
90        removeValue((K) key, pool.get());
91      }
92      return null;
93    }
94  
95    public boolean removeValue(K key, V value) {
96      Pool<V> pool = pools.get(key);
97      boolean res = false;
98      if (pool != null) {
99        res = pool.remove(value);
100       if (res && pool.size() == 0) {
101         pools.remove(key);
102       }
103     }
104     return res;
105   }
106 
107   @Override
108   public Collection<V> values() {
109     Collection<V> values = new ArrayList<V>();
110     for (Pool<V> pool : pools.values()) {
111       Collection<V> poolValues = pool.values();
112       if (poolValues != null) {
113         values.addAll(poolValues);
114       }
115     }
116     return values;
117   }
118 
119   public Collection<V> values(K key) {
120     Collection<V> values = new ArrayList<V>();
121     Pool<V> pool = pools.get(key);
122     if (pool != null) {
123       Collection<V> poolValues = pool.values();
124       if (poolValues != null) {
125         values.addAll(poolValues);
126       }
127     }
128     return values;
129   }
130 
131 
132   @Override
133   public boolean isEmpty() {
134     return pools.isEmpty();
135   }
136 
137   @Override
138   public int size() {
139     return pools.size();
140   }
141 
142   public int size(K key) {
143     Pool<V> pool = pools.get(key);
144     return pool != null ? pool.size() : 0;
145   }
146 
147   @Override
148   public boolean containsKey(Object key) {
149     return pools.containsKey(key);
150   }
151 
152   @Override
153   public boolean containsValue(Object value) {
154     if (value == null) {
155       return false;
156     }
157     for (Pool<V> pool : pools.values()) {
158       if (value.equals(pool.get())) {
159         return true;
160       }
161     }
162     return false;
163   }
164 
165   @Override
166   public void putAll(Map<? extends K, ? extends V> map) {
167     for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
168       put(entry.getKey(), entry.getValue());
169     }
170   }
171 
172   @Override
173   public void clear() {
174     for (Pool<V> pool : pools.values()) {
175       pool.clear();
176     }
177     pools.clear();
178   }
179 
180   @Override
181   public Set<K> keySet() {
182     return pools.keySet();
183   }
184 
185   @Override
186   public Set<Map.Entry<K, V>> entrySet() {
187     Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
188     for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
189       final K poolKey = poolEntry.getKey();
190       final Pool<V> pool = poolEntry.getValue();
191       if (pool != null) {
192         for (final V poolValue : pool.values()) {
193           entries.add(new Map.Entry<K, V>() {
194             @Override
195             public K getKey() {
196               return poolKey;
197             }
198 
199             @Override
200             public V getValue() {
201               return poolValue;
202             }
203 
204             @Override
205             public V setValue(V value) {
206               return pool.put(value);
207             }
208           });
209         }
210       }
211     }
212     return entries;
213   }
214 
215   protected interface Pool<R> {
216     R get();
217 
218     R put(R resource);
219 
220     boolean remove(R resource);
221 
222     void clear();
223 
224     Collection<R> values();
225 
226     int size();
227   }
228 
229   public enum PoolType {
230     Reusable, ThreadLocal, RoundRobin;
231 
232     public static PoolType valueOf(String poolTypeName,
233         PoolType defaultPoolType, PoolType... allowedPoolTypes) {
234       PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
235       if (poolType != null) {
236         boolean allowedType = false;
237         if (poolType.equals(defaultPoolType)) {
238           allowedType = true;
239         } else {
240           if (allowedPoolTypes != null) {
241             for (PoolType allowedPoolType : allowedPoolTypes) {
242               if (poolType.equals(allowedPoolType)) {
243                 allowedType = true;
244                 break;
245               }
246             }
247           }
248         }
249         if (!allowedType) {
250           poolType = null;
251         }
252       }
253       return (poolType != null) ? poolType : defaultPoolType;
254     }
255 
256     public static String fuzzyNormalize(String name) {
257       return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
258     }
259 
260     public static PoolType fuzzyMatch(String name) {
261       for (PoolType poolType : values()) {
262         if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
263           return poolType;
264         }
265       }
266       return null;
267     }
268   }
269 
270   protected Pool<V> createPool() {
271     switch (poolType) {
272     case Reusable:
273       return new ReusablePool<V>(poolMaxSize);
274     case RoundRobin:
275       return new RoundRobinPool<V>(poolMaxSize);
276     case ThreadLocal:
277       return new ThreadLocalPool<V>();
278     }
279     return null;
280   }
281 
282   /**
283    * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
284    * on the {@link java.util.LinkedList} class. It essentially allows resources to be
285    * checked out, at which point it is removed from this pool. When the resource
286    * is no longer required, it should be returned to the pool in order to be
287    * reused.
288    *
289    * <p>
290    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
291    * the pool is unbounded. Otherwise, it caps the number of consumers that can
292    * check out a resource from this pool to the (non-zero positive) value
293    * specified in {@link #maxSize}.
294    * </p>
295    *
296    * @param <R>
297    *          the type of the resource
298    */
299   @SuppressWarnings("serial")
300   public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
301     private int maxSize;
302 
303     public ReusablePool(int maxSize) {
304       this.maxSize = maxSize;
305 
306     }
307 
308     @Override
309     public R get() {
310       return poll();
311     }
312 
313     @Override
314     public R put(R resource) {
315       if (super.size() < maxSize) {
316         add(resource);
317       }
318       return null;
319     }
320 
321     @Override
322     public Collection<R> values() {
323       return this;
324     }
325   }
326 
327   /**
328    * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
329    * stores its resources in an {@link ArrayList}. It load-balances access to
330    * its resources by returning a different resource every time a given key is
331    * looked up.
332    *
333    * <p>
334    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
335    * the pool is unbounded. Otherwise, it caps the number of resources in this
336    * pool to the (non-zero positive) value specified in {@link #maxSize}.
337    * </p>
338    *
339    * @param <R>
340    *          the type of the resource
341    *
342    */
343   @SuppressWarnings("serial")
344   class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
345     private int maxSize;
346     private int nextResource = 0;
347 
348     public RoundRobinPool(int maxSize) {
349       this.maxSize = maxSize;
350     }
351 
352     @Override
353     public R put(R resource) {
354       if (super.size() < maxSize) {
355         add(resource);
356       }
357       return null;
358     }
359 
360     @Override
361     public R get() {
362       if (super.size() < maxSize) {
363         return null;
364       }
365       nextResource %= super.size();
366       R resource = get(nextResource++);
367       return resource;
368     }
369 
370     @Override
371     public Collection<R> values() {
372       return this;
373     }
374 
375   }
376 
377   /**
378    * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
379    * builds on the {@link ThreadLocal} class. It essentially binds the resource
380    * to the thread from which it is accessed.
381    *
382    * <p>
383    * Note that the size of the pool is essentially bounded by the number of threads
384    * that add resources to this pool.
385    * </p>
386    *
387    * @param <R>
388    *          the type of the resource
389    */
390   static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
391     private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
392 
393     public ThreadLocalPool() {
394     }
395 
396     @Override
397     public R put(R resource) {
398       R previousResource = get();
399       if (previousResource == null) {
400         AtomicInteger poolSize = poolSizes.get(this);
401         if (poolSize == null) {
402           poolSizes.put(this, poolSize = new AtomicInteger(0));
403         }
404         poolSize.incrementAndGet();
405       }
406       this.set(resource);
407       return previousResource;
408     }
409 
410     @Override
411     public void remove() {
412       super.remove();
413       AtomicInteger poolSize = poolSizes.get(this);
414       if (poolSize != null) {
415         poolSize.decrementAndGet();
416       }
417     }
418 
419     @Override
420     public int size() {
421       AtomicInteger poolSize = poolSizes.get(this);
422       return poolSize != null ? poolSize.get() : 0;
423     }
424 
425     @Override
426     public boolean remove(R resource) {
427       R previousResource = super.get();
428       if (resource != null && resource.equals(previousResource)) {
429         remove();
430         return true;
431       } else {
432         return false;
433       }
434     }
435 
436     @Override
437     public void clear() {
438       super.remove();
439     }
440 
441     @Override
442     public Collection<R> values() {
443       List<R> values = new ArrayList<R>();
444       values.add(get());
445       return values;
446     }
447   }
448 }