001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Locale;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 *
037 * The <code>PoolMap</code> maps a key to a collection of values, the elements
038 * of which are managed by a pool. In effect, that collection acts as a shared
039 * pool of resources, access to which is closely controlled as per the semantics
040 * of the pool.
041 *
042 * <p>
043 * In case the size of the pool is set to a non-zero positive number, that is
044 * used to cap the number of resources that a pool may contain for any given
045 * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
046 * </p>
047 *
048 * @param <K>
049 *          the type of the key to the resource
050 * @param <V>
051 *          the type of the resource being pooled
052 */
053@InterfaceAudience.Private
054public class PoolMap<K, V> implements Map<K, V> {
055  private PoolType poolType;
056
057  private int poolMaxSize;
058
059  private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();
060
061  public PoolMap(PoolType poolType) {
062    this.poolType = poolType;
063  }
064
065  public PoolMap(PoolType poolType, int poolMaxSize) {
066    this.poolType = poolType;
067    this.poolMaxSize = poolMaxSize;
068  }
069
070  @Override
071  public V get(Object key) {
072    Pool<V> pool = pools.get(key);
073    return pool != null ? pool.get() : null;
074  }
075
076  @Override
077  public V put(K key, V value) {
078    Pool<V> pool = pools.get(key);
079    if (pool == null) {
080      pools.put(key, pool = createPool());
081    }
082    return pool != null ? pool.put(value) : null;
083  }
084
085  @SuppressWarnings("unchecked")
086  @Override
087  public V remove(Object key) {
088    Pool<V> pool = pools.remove(key);
089    if (pool != null) {
090      removeValue((K) key, pool.get());
091    }
092    return null;
093  }
094
095  public boolean removeValue(K key, V value) {
096    Pool<V> pool = pools.get(key);
097    boolean res = false;
098    if (pool != null) {
099      res = pool.remove(value);
100      if (res && pool.size() == 0) {
101        pools.remove(key);
102      }
103    }
104    return res;
105  }
106
107  @Override
108  public Collection<V> values() {
109    Collection<V> values = new ArrayList<>();
110    for (Pool<V> pool : pools.values()) {
111      Collection<V> poolValues = pool.values();
112      if (poolValues != null) {
113        values.addAll(poolValues);
114      }
115    }
116    return values;
117  }
118
119  public Collection<V> values(K key) {
120    Collection<V> values = new ArrayList<>();
121    Pool<V> pool = pools.get(key);
122    if (pool != null) {
123      Collection<V> poolValues = pool.values();
124      if (poolValues != null) {
125        values.addAll(poolValues);
126      }
127    }
128    return values;
129  }
130
131
132  @Override
133  public boolean isEmpty() {
134    return pools.isEmpty();
135  }
136
137  @Override
138  public int size() {
139    return pools.size();
140  }
141
142  public int size(K key) {
143    Pool<V> pool = pools.get(key);
144    return pool != null ? pool.size() : 0;
145  }
146
147  @Override
148  public boolean containsKey(Object key) {
149    return pools.containsKey(key);
150  }
151
152  @Override
153  public boolean containsValue(Object value) {
154    if (value == null) {
155      return false;
156    }
157    for (Pool<V> pool : pools.values()) {
158      if (value.equals(pool.get())) {
159        return true;
160      }
161    }
162    return false;
163  }
164
165  @Override
166  public void putAll(Map<? extends K, ? extends V> map) {
167    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
168      put(entry.getKey(), entry.getValue());
169    }
170  }
171
172  @Override
173  public void clear() {
174    for (Pool<V> pool : pools.values()) {
175      pool.clear();
176    }
177    pools.clear();
178  }
179
180  @Override
181  public Set<K> keySet() {
182    return pools.keySet();
183  }
184
185  @Override
186  public Set<Map.Entry<K, V>> entrySet() {
187    Set<Map.Entry<K, V>> entries = new HashSet<>();
188    for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
189      final K poolKey = poolEntry.getKey();
190      final Pool<V> pool = poolEntry.getValue();
191      if (pool != null) {
192        for (final V poolValue : pool.values()) {
193          entries.add(new Map.Entry<K, V>() {
194            @Override
195            public K getKey() {
196              return poolKey;
197            }
198
199            @Override
200            public V getValue() {
201              return poolValue;
202            }
203
204            @Override
205            public V setValue(V value) {
206              return pool.put(value);
207            }
208          });
209        }
210      }
211    }
212    return entries;
213  }
214
215  protected interface Pool<R> {
216    R get();
217
218    R put(R resource);
219
220    boolean remove(R resource);
221
222    void clear();
223
224    Collection<R> values();
225
226    int size();
227  }
228
229  public enum PoolType {
230    ThreadLocal, RoundRobin;
231
232    public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType) {
233      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
234      return (poolType != null) ? poolType : defaultPoolType;
235    }
236
237    public static String fuzzyNormalize(String name) {
238      return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : "";
239    }
240
241    public static PoolType fuzzyMatch(String name) {
242      for (PoolType poolType : values()) {
243        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
244          return poolType;
245        }
246      }
247      return null;
248    }
249  }
250
251  protected Pool<V> createPool() {
252    switch (poolType) {
253    case RoundRobin:
254      return new RoundRobinPool<>(poolMaxSize);
255    case ThreadLocal:
256      return new ThreadLocalPool<>();
257    }
258    return null;
259  }
260
261  /**
262   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
263   * stores its resources in an {@link ArrayList}. It load-balances access to
264   * its resources by returning a different resource every time a given key is
265   * looked up.
266   *
267   * <p>
268   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
269   * the pool is unbounded. Otherwise, it caps the number of resources in this
270   * pool to the (non-zero positive) value specified in {@link #maxSize}.
271   * </p>
272   *
273   * @param <R>
274   *          the type of the resource
275   *
276   */
277  @SuppressWarnings("serial")
278  static class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
279    private int maxSize;
280    private int nextResource = 0;
281
282    public RoundRobinPool(int maxSize) {
283      this.maxSize = maxSize;
284    }
285
286    @Override
287    public R put(R resource) {
288      if (super.size() < maxSize) {
289        add(resource);
290      }
291      return null;
292    }
293
294    @Override
295    public R get() {
296      if (super.size() < maxSize) {
297        return null;
298      }
299      nextResource %= super.size();
300      R resource = get(nextResource++);
301      return resource;
302    }
303
304    @Override
305    public Collection<R> values() {
306      return this;
307    }
308
309  }
310
311  /**
312   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
313   * builds on the {@link ThreadLocal} class. It essentially binds the resource
314   * to the thread from which it is accessed.
315   *
316   * <p>
317   * Note that the size of the pool is essentially bounded by the number of threads
318   * that add resources to this pool.
319   * </p>
320   *
321   * @param <R>
322   *          the type of the resource
323   */
324  static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
325    private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<>();
326
327    public ThreadLocalPool() {
328    }
329
330    @Override
331    public R put(R resource) {
332      R previousResource = get();
333      if (previousResource == null) {
334        AtomicInteger poolSize = poolSizes.get(this);
335        if (poolSize == null) {
336          poolSizes.put(this, poolSize = new AtomicInteger(0));
337        }
338        poolSize.incrementAndGet();
339      }
340      this.set(resource);
341      return previousResource;
342    }
343
344    @Override
345    public void remove() {
346      super.remove();
347      AtomicInteger poolSize = poolSizes.get(this);
348      if (poolSize != null) {
349        poolSize.decrementAndGet();
350      }
351    }
352
353    @Override
354    public int size() {
355      AtomicInteger poolSize = poolSizes.get(this);
356      return poolSize != null ? poolSize.get() : 0;
357    }
358
359    @Override
360    public boolean remove(R resource) {
361      R previousResource = super.get();
362      if (resource != null && resource.equals(previousResource)) {
363        remove();
364        return true;
365      } else {
366        return false;
367      }
368    }
369
370    @Override
371    public void clear() {
372      super.remove();
373    }
374
375    @Override
376    public Collection<R> values() {
377      List<R> values = new ArrayList<>();
378      values.add(get());
379      return values;
380    }
381  }
382}