001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Locale;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 *
038 * The <code>PoolMap</code> maps a key to a collection of values, the elements
039 * of which are managed by a pool. In effect, that collection acts as a shared
040 * pool of resources, access to which is closely controlled as per the semantics
041 * of the pool.
042 *
043 * <p>
044 * In case the size of the pool is set to a non-zero positive number, that is
045 * used to cap the number of resources that a pool may contain for any given
046 * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
047 * </p>
048 *
049 * @param <K>
050 *          the type of the key to the resource
051 * @param <V>
052 *          the type of the resource being pooled
053 */
054@InterfaceAudience.Private
055public class PoolMap<K, V> implements Map<K, V> {
056  private PoolType poolType;
057
058  private int poolMaxSize;
059
060  private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();
061
062  public PoolMap(PoolType poolType) {
063    this.poolType = poolType;
064  }
065
066  public PoolMap(PoolType poolType, int poolMaxSize) {
067    this.poolType = poolType;
068    this.poolMaxSize = poolMaxSize;
069  }
070
071  @Override
072  public V get(Object key) {
073    Pool<V> pool = pools.get(key);
074    return pool != null ? pool.get() : null;
075  }
076
077  @Override
078  public V put(K key, V value) {
079    Pool<V> pool = pools.get(key);
080    if (pool == null) {
081      pools.put(key, pool = createPool());
082    }
083    return pool != null ? pool.put(value) : null;
084  }
085
086  @SuppressWarnings("unchecked")
087  @Override
088  public V remove(Object key) {
089    Pool<V> pool = pools.remove(key);
090    if (pool != null) {
091      removeValue((K) key, pool.get());
092    }
093    return null;
094  }
095
096  public boolean removeValue(K key, V value) {
097    Pool<V> pool = pools.get(key);
098    boolean res = false;
099    if (pool != null) {
100      res = pool.remove(value);
101      if (res && pool.size() == 0) {
102        pools.remove(key);
103      }
104    }
105    return res;
106  }
107
108  @Override
109  public Collection<V> values() {
110    Collection<V> values = new ArrayList<>();
111    for (Pool<V> pool : pools.values()) {
112      Collection<V> poolValues = pool.values();
113      if (poolValues != null) {
114        values.addAll(poolValues);
115      }
116    }
117    return values;
118  }
119
120  public Collection<V> values(K key) {
121    Collection<V> values = new ArrayList<>();
122    Pool<V> pool = pools.get(key);
123    if (pool != null) {
124      Collection<V> poolValues = pool.values();
125      if (poolValues != null) {
126        values.addAll(poolValues);
127      }
128    }
129    return values;
130  }
131
132
133  @Override
134  public boolean isEmpty() {
135    return pools.isEmpty();
136  }
137
138  @Override
139  public int size() {
140    return pools.size();
141  }
142
143  public int size(K key) {
144    Pool<V> pool = pools.get(key);
145    return pool != null ? pool.size() : 0;
146  }
147
148  @Override
149  public boolean containsKey(Object key) {
150    return pools.containsKey(key);
151  }
152
153  @Override
154  public boolean containsValue(Object value) {
155    if (value == null) {
156      return false;
157    }
158    for (Pool<V> pool : pools.values()) {
159      if (value.equals(pool.get())) {
160        return true;
161      }
162    }
163    return false;
164  }
165
166  @Override
167  public void putAll(Map<? extends K, ? extends V> map) {
168    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
169      put(entry.getKey(), entry.getValue());
170    }
171  }
172
173  @Override
174  public void clear() {
175    for (Pool<V> pool : pools.values()) {
176      pool.clear();
177    }
178    pools.clear();
179  }
180
181  @Override
182  public Set<K> keySet() {
183    return pools.keySet();
184  }
185
186  @Override
187  public Set<Map.Entry<K, V>> entrySet() {
188    Set<Map.Entry<K, V>> entries = new HashSet<>();
189    for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
190      final K poolKey = poolEntry.getKey();
191      final Pool<V> pool = poolEntry.getValue();
192      if (pool != null) {
193        for (final V poolValue : pool.values()) {
194          entries.add(new Map.Entry<K, V>() {
195            @Override
196            public K getKey() {
197              return poolKey;
198            }
199
200            @Override
201            public V getValue() {
202              return poolValue;
203            }
204
205            @Override
206            public V setValue(V value) {
207              return pool.put(value);
208            }
209          });
210        }
211      }
212    }
213    return entries;
214  }
215
216  protected interface Pool<R> {
217    R get();
218
219    R put(R resource);
220
221    boolean remove(R resource);
222
223    void clear();
224
225    Collection<R> values();
226
227    int size();
228  }
229
230  public enum PoolType {
231    ThreadLocal, RoundRobin;
232
233    public static PoolType valueOf(String poolTypeName,
234        PoolType defaultPoolType, PoolType... allowedPoolTypes) {
235      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
236      if (poolType != null) {
237        boolean allowedType = false;
238        if (poolType.equals(defaultPoolType)) {
239          allowedType = true;
240        } else {
241          if (allowedPoolTypes != null) {
242            for (PoolType allowedPoolType : allowedPoolTypes) {
243              if (poolType.equals(allowedPoolType)) {
244                allowedType = true;
245                break;
246              }
247            }
248          }
249        }
250        if (!allowedType) {
251          poolType = null;
252        }
253      }
254      return (poolType != null) ? poolType : defaultPoolType;
255    }
256
257    public static String fuzzyNormalize(String name) {
258      return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : "";
259    }
260
261    public static PoolType fuzzyMatch(String name) {
262      for (PoolType poolType : values()) {
263        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
264          return poolType;
265        }
266      }
267      return null;
268    }
269  }
270
271  protected Pool<V> createPool() {
272    switch (poolType) {
273    case RoundRobin:
274      return new RoundRobinPool<>(poolMaxSize);
275    case ThreadLocal:
276      return new ThreadLocalPool<>();
277    }
278    return null;
279  }
280
281  /**
282   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
283   * stores its resources in an {@link ArrayList}. It load-balances access to
284   * its resources by returning a different resource every time a given key is
285   * looked up.
286   *
287   * <p>
288   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
289   * the pool is unbounded. Otherwise, it caps the number of resources in this
290   * pool to the (non-zero positive) value specified in {@link #maxSize}.
291   * </p>
292   *
293   * @param <R>
294   *          the type of the resource
295   *
296   */
297  @SuppressWarnings("serial")
298  static class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
299    private int maxSize;
300    private int nextResource = 0;
301
302    public RoundRobinPool(int maxSize) {
303      this.maxSize = maxSize;
304    }
305
306    @Override
307    public R put(R resource) {
308      if (super.size() < maxSize) {
309        add(resource);
310      }
311      return null;
312    }
313
314    @Override
315    public R get() {
316      if (super.size() < maxSize) {
317        return null;
318      }
319      nextResource %= super.size();
320      R resource = get(nextResource++);
321      return resource;
322    }
323
324    @Override
325    public Collection<R> values() {
326      return this;
327    }
328
329  }
330
331  /**
332   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
333   * builds on the {@link ThreadLocal} class. It essentially binds the resource
334   * to the thread from which it is accessed.
335   *
336   * <p>
337   * Note that the size of the pool is essentially bounded by the number of threads
338   * that add resources to this pool.
339   * </p>
340   *
341   * @param <R>
342   *          the type of the resource
343   */
344  static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
345    private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<>();
346
347    public ThreadLocalPool() {
348    }
349
350    @Override
351    public R put(R resource) {
352      R previousResource = get();
353      if (previousResource == null) {
354        AtomicInteger poolSize = poolSizes.get(this);
355        if (poolSize == null) {
356          poolSizes.put(this, poolSize = new AtomicInteger(0));
357        }
358        poolSize.incrementAndGet();
359      }
360      this.set(resource);
361      return previousResource;
362    }
363
364    @Override
365    public void remove() {
366      super.remove();
367      AtomicInteger poolSize = poolSizes.get(this);
368      if (poolSize != null) {
369        poolSize.decrementAndGet();
370      }
371    }
372
373    @Override
374    public int size() {
375      AtomicInteger poolSize = poolSizes.get(this);
376      return poolSize != null ? poolSize.get() : 0;
377    }
378
379    @Override
380    public boolean remove(R resource) {
381      R previousResource = super.get();
382      if (resource != null && resource.equals(previousResource)) {
383        remove();
384        return true;
385      } else {
386        return false;
387      }
388    }
389
390    @Override
391    public void clear() {
392      super.remove();
393    }
394
395    @Override
396    public Collection<R> values() {
397      List<R> values = new ArrayList<>();
398      values.add(get());
399      return values;
400    }
401  }
402}