001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Locale;
027import java.util.Map;
028import java.util.Objects;
029
030import org.apache.yetus.audience.InterfaceAudience;
031
032/**
033 *
034 * The <code>PoolMap</code> maps a key to a collection of values, the elements
035 * of which are managed by a pool. In effect, that collection acts as a shared
036 * pool of resources, access to which is closely controlled as per the semantics
037 * of the pool.
038 *
039 * <p>
040 * In case the size of the pool is set to a non-zero positive number, that is
041 * used to cap the number of resources that a pool may contain for any given
042 * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
043 * </p>
044 *
045 * <p>
046 * PoolMap is thread-safe. It does not remove elements automatically. Unused resources
047 * must be closed and removed explicitly.
048 * </p>
049 *
050 * @param <K>
051 *          the type of the key to the resource
052 * @param <V>
053 *          the type of the resource being pooled
054 */
055@InterfaceAudience.Private
056public class PoolMap<K, V> {
057  private final Map<K, Pool<V>> pools;
058  private final PoolType poolType;
059  private final int poolMaxSize;
060
061   public PoolMap(PoolType poolType, int poolMaxSize) {
062     pools = new HashMap<>();
063     this.poolType = poolType;
064     this.poolMaxSize = poolMaxSize;
065  }
066
067  public V getOrCreate(K key, PoolResourceSupplier<V> supplier) throws IOException {
068     synchronized (pools) {
069       Pool<V> pool = pools.get(key);
070
071       if (pool == null) {
072         pool = createPool();
073         pools.put(key, pool);
074       }
075
076       try {
077         return pool.getOrCreate(supplier);
078       } catch (IOException | RuntimeException | Error e) {
079         if (pool.size() == 0) {
080           pools.remove(key);
081         }
082
083         throw e;
084       }
085     }
086  }
087  public boolean remove(K key, V value) {
088    synchronized (pools) {
089      Pool<V> pool = pools.get(key);
090
091      if (pool == null) {
092        return false;
093      }
094
095      boolean removed = pool.remove(value);
096
097      if (removed && pool.size() == 0) {
098        pools.remove(key);
099      }
100
101      return removed;
102    }
103  }
104
105  public List<V> values() {
106    List<V> values = new ArrayList<>();
107
108    synchronized (pools) {
109      for (Pool<V> pool : pools.values()) {
110        Collection<V> poolValues = pool.values();
111        if (poolValues != null) {
112          values.addAll(poolValues);
113        }
114      }
115    }
116
117    return values;
118  }
119
120  public void clear() {
121    synchronized (pools) {
122      for (Pool<V> pool : pools.values()) {
123        pool.clear();
124      }
125
126      pools.clear();
127    }
128  }
129
130  public interface PoolResourceSupplier<R> {
131     R get() throws IOException;
132  }
133
134  protected static <V> V createResource(PoolResourceSupplier<V> supplier) throws IOException {
135    V resource = supplier.get();
136    return Objects.requireNonNull(resource, "resource cannot be null.");
137  }
138
139  protected interface Pool<R> {
140    R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException;
141
142    boolean remove(R resource);
143
144    void clear();
145
146    Collection<R> values();
147
148    int size();
149  }
150
151  public enum PoolType {
152    ThreadLocal, RoundRobin;
153
154    public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType) {
155      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
156      return (poolType != null) ? poolType : defaultPoolType;
157    }
158
159    public static String fuzzyNormalize(String name) {
160      return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : "";
161    }
162
163    public static PoolType fuzzyMatch(String name) {
164      for (PoolType poolType : values()) {
165        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
166          return poolType;
167        }
168      }
169      return null;
170    }
171  }
172
173  protected Pool<V> createPool() {
174    switch (poolType) {
175    case RoundRobin:
176      return new RoundRobinPool<>(poolMaxSize);
177    case ThreadLocal:
178      return new ThreadLocalPool<>();
179    default:
180      return new RoundRobinPool<>(poolMaxSize);
181    }
182  }
183
184  /**
185   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
186   * stores its resources in an {@link ArrayList}. It load-balances access to
187   * its resources by returning a different resource every time a given key is
188   * looked up.
189   *
190   * <p>
191   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
192   * the pool is unbounded. Otherwise, it caps the number of resources in this
193   * pool to the (non-zero positive) value specified in {@link #maxSize}.
194   * </p>
195   *
196   * @param <R>
197   *          the type of the resource
198   *
199   */
200  @SuppressWarnings("serial")
201  static class RoundRobinPool<R> implements Pool<R> {
202    private final List<R> resources;
203    private final int maxSize;
204
205    private int nextIndex;
206
207    public RoundRobinPool(int maxSize) {
208      if (maxSize <= 0) {
209        throw new IllegalArgumentException("maxSize must be positive");
210      }
211
212      resources = new ArrayList<>(maxSize);
213      this.maxSize = maxSize;
214    }
215
216    @Override
217    public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException {
218      int size = resources.size();
219      R resource;
220
221      /* letting pool to grow */
222      if (size < maxSize) {
223        resource = createResource(supplier);
224        resources.add(resource);
225      } else {
226        resource = resources.get(nextIndex);
227
228        /* at this point size cannot be 0 */
229        nextIndex = (nextIndex + 1) % size;
230      }
231
232      return resource;
233    }
234
235    @Override
236    public boolean remove(R resource) {
237      return resources.remove(resource);
238    }
239
240    @Override
241    public void clear() {
242      resources.clear();
243    }
244
245    @Override
246    public Collection<R> values() {
247      return resources;
248    }
249
250    @Override
251    public int size() {
252      return resources.size();
253    }
254  }
255
256  /**
257   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
258   * works similarly to {@link ThreadLocal} class. It essentially binds the resource
259   * to the thread from which it is accessed. It doesn't remove resources when a thread exits,
260   * those resources must be closed manually.
261   *
262   * <p>
263   * Note that the size of the pool is essentially bounded by the number of threads
264   * that add resources to this pool.
265   * </p>
266   *
267   * @param <R>
268   *          the type of the resource
269   */
270  static class ThreadLocalPool<R> implements Pool<R> {
271    private final Map<Thread, R> resources;
272
273    public ThreadLocalPool() {
274      resources = new HashMap<>();
275    }
276
277    @Override
278    public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException {
279      Thread myself = Thread.currentThread();
280      R resource = resources.get(myself);
281
282      if (resource == null) {
283        resource = createResource(supplier);
284        resources.put(myself, resource);
285      }
286
287      return resource;
288    }
289
290    @Override
291    public boolean remove(R resource) {
292      /* remove can be called from any thread */
293      return resources.values().remove(resource);
294    }
295
296    @Override
297    public int size() {
298      return resources.size();
299    }
300
301    @Override
302    public void clear() {
303      resources.clear();
304    }
305
306    @Override
307    public Collection<R> values() {
308      return resources.values();
309    }
310  }
311}