001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Locale;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 *
038 * The <code>PoolMap</code> maps a key to a collection of values, the elements
039 * of which are managed by a pool. In effect, that collection acts as a shared
040 * pool of resources, access to which is closely controlled as per the semantics
041 * of the pool.
042 *
043 * <p>
044 * In case the size of the pool is set to a non-zero positive number, that is
045 * used to cap the number of resources that a pool may contain for any given
046 * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
047 * </p>
048 *
049 * @param <K>
050 *          the type of the key to the resource
051 * @param <V>
052 *          the type of the resource being pooled
053 */
054@InterfaceAudience.Private
055public class PoolMap<K, V> implements Map<K, V> {
056  private PoolType poolType;
057
058  private int poolMaxSize;
059
060  private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();
061
062  public PoolMap(PoolType poolType) {
063    this.poolType = poolType;
064  }
065
066  public PoolMap(PoolType poolType, int poolMaxSize) {
067    this.poolType = poolType;
068    this.poolMaxSize = poolMaxSize;
069  }
070
071  @Override
072  public V get(Object key) {
073    Pool<V> pool = pools.get(key);
074    return pool != null ? pool.get() : null;
075  }
076
077  @Override
078  public V put(K key, V value) {
079    Pool<V> pool = pools.get(key);
080    if (pool == null) {
081      pools.put(key, pool = createPool());
082    }
083    return pool != null ? pool.put(value) : null;
084  }
085
086  @SuppressWarnings("unchecked")
087  @Override
088  public V remove(Object key) {
089    Pool<V> pool = pools.remove(key);
090    if (pool != null) {
091      removeValue((K) key, pool.get());
092    }
093    return null;
094  }
095
096  public boolean removeValue(K key, V value) {
097    Pool<V> pool = pools.get(key);
098    boolean res = false;
099    if (pool != null) {
100      res = pool.remove(value);
101      if (res && pool.size() == 0) {
102        pools.remove(key);
103      }
104    }
105    return res;
106  }
107
108  @Override
109  public Collection<V> values() {
110    Collection<V> values = new ArrayList<>();
111    for (Pool<V> pool : pools.values()) {
112      Collection<V> poolValues = pool.values();
113      if (poolValues != null) {
114        values.addAll(poolValues);
115      }
116    }
117    return values;
118  }
119
120  public Collection<V> values(K key) {
121    Collection<V> values = new ArrayList<>();
122    Pool<V> pool = pools.get(key);
123    if (pool != null) {
124      Collection<V> poolValues = pool.values();
125      if (poolValues != null) {
126        values.addAll(poolValues);
127      }
128    }
129    return values;
130  }
131
132
133  @Override
134  public boolean isEmpty() {
135    return pools.isEmpty();
136  }
137
138  @Override
139  public int size() {
140    return pools.size();
141  }
142
143  public int size(K key) {
144    Pool<V> pool = pools.get(key);
145    return pool != null ? pool.size() : 0;
146  }
147
148  @Override
149  public boolean containsKey(Object key) {
150    return pools.containsKey(key);
151  }
152
153  @Override
154  public boolean containsValue(Object value) {
155    if (value == null) {
156      return false;
157    }
158    for (Pool<V> pool : pools.values()) {
159      if (value.equals(pool.get())) {
160        return true;
161      }
162    }
163    return false;
164  }
165
166  @Override
167  public void putAll(Map<? extends K, ? extends V> map) {
168    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
169      put(entry.getKey(), entry.getValue());
170    }
171  }
172
173  @Override
174  public void clear() {
175    for (Pool<V> pool : pools.values()) {
176      pool.clear();
177    }
178    pools.clear();
179  }
180
181  @Override
182  public Set<K> keySet() {
183    return pools.keySet();
184  }
185
186  @Override
187  public Set<Map.Entry<K, V>> entrySet() {
188    Set<Map.Entry<K, V>> entries = new HashSet<>();
189    for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
190      final K poolKey = poolEntry.getKey();
191      final Pool<V> pool = poolEntry.getValue();
192      if (pool != null) {
193        for (final V poolValue : pool.values()) {
194          entries.add(new Map.Entry<K, V>() {
195            @Override
196            public K getKey() {
197              return poolKey;
198            }
199
200            @Override
201            public V getValue() {
202              return poolValue;
203            }
204
205            @Override
206            public V setValue(V value) {
207              return pool.put(value);
208            }
209          });
210        }
211      }
212    }
213    return entries;
214  }
215
216  protected interface Pool<R> {
217    R get();
218
219    R put(R resource);
220
221    boolean remove(R resource);
222
223    void clear();
224
225    Collection<R> values();
226
227    int size();
228  }
229
230  public enum PoolType {
231    Reusable, ThreadLocal, RoundRobin;
232
233    public static PoolType valueOf(String poolTypeName,
234        PoolType defaultPoolType, PoolType... allowedPoolTypes) {
235      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
236      if (poolType != null) {
237        boolean allowedType = false;
238        if (poolType.equals(defaultPoolType)) {
239          allowedType = true;
240        } else {
241          if (allowedPoolTypes != null) {
242            for (PoolType allowedPoolType : allowedPoolTypes) {
243              if (poolType.equals(allowedPoolType)) {
244                allowedType = true;
245                break;
246              }
247            }
248          }
249        }
250        if (!allowedType) {
251          poolType = null;
252        }
253      }
254      return (poolType != null) ? poolType : defaultPoolType;
255    }
256
257    public static String fuzzyNormalize(String name) {
258      return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : "";
259    }
260
261    public static PoolType fuzzyMatch(String name) {
262      for (PoolType poolType : values()) {
263        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
264          return poolType;
265        }
266      }
267      return null;
268    }
269  }
270
271  protected Pool<V> createPool() {
272    switch (poolType) {
273    case Reusable:
274      return new ReusablePool<>(poolMaxSize);
275    case RoundRobin:
276      return new RoundRobinPool<>(poolMaxSize);
277    case ThreadLocal:
278      return new ThreadLocalPool<>();
279    }
280    return null;
281  }
282
283  /**
284   * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
285   * on the {@link java.util.LinkedList} class. It essentially allows resources to be
286   * checked out, at which point it is removed from this pool. When the resource
287   * is no longer required, it should be returned to the pool in order to be
288   * reused.
289   *
290   * <p>
291   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
292   * the pool is unbounded. Otherwise, it caps the number of consumers that can
293   * check out a resource from this pool to the (non-zero positive) value
294   * specified in {@link #maxSize}.
295   * </p>
296   *
297   * @param <R>
298   *          the type of the resource
299   */
300  @SuppressWarnings("serial")
301  public static class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
302    private int maxSize;
303
304    public ReusablePool(int maxSize) {
305      this.maxSize = maxSize;
306
307    }
308
309    @Override
310    public R get() {
311      return poll();
312    }
313
314    @Override
315    public R put(R resource) {
316      if (super.size() < maxSize) {
317        add(resource);
318      }
319      return null;
320    }
321
322    @Override
323    public Collection<R> values() {
324      return this;
325    }
326  }
327
328  /**
329   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
330   * stores its resources in an {@link ArrayList}. It load-balances access to
331   * its resources by returning a different resource every time a given key is
332   * looked up.
333   *
334   * <p>
335   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
336   * the pool is unbounded. Otherwise, it caps the number of resources in this
337   * pool to the (non-zero positive) value specified in {@link #maxSize}.
338   * </p>
339   *
340   * @param <R>
341   *          the type of the resource
342   *
343   */
344  @SuppressWarnings("serial")
345  static class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
346    private int maxSize;
347    private int nextResource = 0;
348
349    public RoundRobinPool(int maxSize) {
350      this.maxSize = maxSize;
351    }
352
353    @Override
354    public R put(R resource) {
355      if (super.size() < maxSize) {
356        add(resource);
357      }
358      return null;
359    }
360
361    @Override
362    public R get() {
363      if (super.size() < maxSize) {
364        return null;
365      }
366      nextResource %= super.size();
367      R resource = get(nextResource++);
368      return resource;
369    }
370
371    @Override
372    public Collection<R> values() {
373      return this;
374    }
375
376  }
377
378  /**
379   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
380   * builds on the {@link ThreadLocal} class. It essentially binds the resource
381   * to the thread from which it is accessed.
382   *
383   * <p>
384   * Note that the size of the pool is essentially bounded by the number of threads
385   * that add resources to this pool.
386   * </p>
387   *
388   * @param <R>
389   *          the type of the resource
390   */
391  static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
392    private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<>();
393
394    public ThreadLocalPool() {
395    }
396
397    @Override
398    public R put(R resource) {
399      R previousResource = get();
400      if (previousResource == null) {
401        AtomicInteger poolSize = poolSizes.get(this);
402        if (poolSize == null) {
403          poolSizes.put(this, poolSize = new AtomicInteger(0));
404        }
405        poolSize.incrementAndGet();
406      }
407      this.set(resource);
408      return previousResource;
409    }
410
411    @Override
412    public void remove() {
413      super.remove();
414      AtomicInteger poolSize = poolSizes.get(this);
415      if (poolSize != null) {
416        poolSize.decrementAndGet();
417      }
418    }
419
420    @Override
421    public int size() {
422      AtomicInteger poolSize = poolSizes.get(this);
423      return poolSize != null ? poolSize.get() : 0;
424    }
425
426    @Override
427    public boolean remove(R resource) {
428      R previousResource = super.get();
429      if (resource != null && resource.equals(previousResource)) {
430        remove();
431        return true;
432      } else {
433        return false;
434      }
435    }
436
437    @Override
438    public void clear() {
439      super.remove();
440    }
441
442    @Override
443    public Collection<R> values() {
444      List<R> values = new ArrayList<>();
445      values.add(get());
446      return values;
447    }
448  }
449}