001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.Objects;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * The <code>PoolMap</code> maps a key to a collection of values, the elements of which are managed
032 * by a pool. In effect, that collection acts as a shared pool of resources, access to which is
033 * closely controlled as per the semantics of the pool.
034 * <p>
035 * In case the size of the pool is set to a non-zero positive number, that is used to cap the number
036 * of resources that a pool may contain for any given key. A size of {@link Integer#MAX_VALUE} is
037 * interpreted as an unbounded pool.
038 * </p>
039 * <p>
040 * PoolMap is thread-safe. It does not remove elements automatically. Unused resources must be
041 * closed and removed explicitly.
042 * </p>
043 * @param <K> the type of the key to the resource
044 * @param <V> the type of the resource being pooled
045 */
046@InterfaceAudience.Private
047public class PoolMap<K, V> {
048  private final Map<K, Pool<V>> pools;
049  private final PoolType poolType;
050  private final int poolMaxSize;
051
052  public PoolMap(PoolType poolType, int poolMaxSize) {
053    pools = new HashMap<>();
054    this.poolType = poolType;
055    this.poolMaxSize = poolMaxSize;
056  }
057
058  public V getOrCreate(K key, PoolResourceSupplier<V> supplier) throws IOException {
059    synchronized (pools) {
060      Pool<V> pool = pools.get(key);
061
062      if (pool == null) {
063        pool = createPool();
064        pools.put(key, pool);
065      }
066
067      try {
068        return pool.getOrCreate(supplier);
069      } catch (IOException | RuntimeException | Error e) {
070        if (pool.size() == 0) {
071          pools.remove(key);
072        }
073
074        throw e;
075      }
076    }
077  }
078
079  public boolean remove(K key, V value) {
080    synchronized (pools) {
081      Pool<V> pool = pools.get(key);
082
083      if (pool == null) {
084        return false;
085      }
086
087      boolean removed = pool.remove(value);
088
089      if (removed && pool.size() == 0) {
090        pools.remove(key);
091      }
092
093      return removed;
094    }
095  }
096
097  public List<V> values() {
098    List<V> values = new ArrayList<>();
099
100    synchronized (pools) {
101      for (Pool<V> pool : pools.values()) {
102        Collection<V> poolValues = pool.values();
103        if (poolValues != null) {
104          values.addAll(poolValues);
105        }
106      }
107    }
108
109    return values;
110  }
111
112  public void clear() {
113    synchronized (pools) {
114      for (Pool<V> pool : pools.values()) {
115        pool.clear();
116      }
117
118      pools.clear();
119    }
120  }
121
122  public interface PoolResourceSupplier<R> {
123    R get() throws IOException;
124  }
125
126  protected static <V> V createResource(PoolResourceSupplier<V> supplier) throws IOException {
127    V resource = supplier.get();
128    return Objects.requireNonNull(resource, "resource cannot be null.");
129  }
130
131  protected interface Pool<R> {
132    R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException;
133
134    boolean remove(R resource);
135
136    void clear();
137
138    Collection<R> values();
139
140    int size();
141  }
142
143  public enum PoolType {
144    ThreadLocal,
145    RoundRobin;
146
147    public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType) {
148      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
149      return (poolType != null) ? poolType : defaultPoolType;
150    }
151
152    public static String fuzzyNormalize(String name) {
153      return name != null ? name.replaceAll("-", "").trim().toLowerCase(Locale.ROOT) : "";
154    }
155
156    public static PoolType fuzzyMatch(String name) {
157      for (PoolType poolType : values()) {
158        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
159          return poolType;
160        }
161      }
162      return null;
163    }
164  }
165
166  protected Pool<V> createPool() {
167    switch (poolType) {
168      case RoundRobin:
169        return new RoundRobinPool<>(poolMaxSize);
170      case ThreadLocal:
171        return new ThreadLocalPool<>();
172      default:
173        return new RoundRobinPool<>(poolMaxSize);
174    }
175  }
176
177  /**
178   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which stores its resources
179   * in an {@link ArrayList}. It load-balances access to its resources by returning a different
180   * resource every time a given key is looked up.
181   * <p>
182   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the pool is
183   * unbounded. Otherwise, it caps the number of resources in this pool to the (non-zero positive)
184   * value specified in {@link #maxSize}.
185   * </p>
186   * @param <R> the type of the resource
187   */
188  @SuppressWarnings("serial")
189  static class RoundRobinPool<R> implements Pool<R> {
190    private final List<R> resources;
191    private final int maxSize;
192
193    private int nextIndex;
194
195    public RoundRobinPool(int maxSize) {
196      if (maxSize <= 0) {
197        throw new IllegalArgumentException("maxSize must be positive");
198      }
199
200      resources = new ArrayList<>(maxSize);
201      this.maxSize = maxSize;
202    }
203
204    @Override
205    public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException {
206      int size = resources.size();
207      R resource;
208
209      /* letting pool to grow */
210      if (size < maxSize) {
211        resource = createResource(supplier);
212        resources.add(resource);
213      } else {
214        resource = resources.get(nextIndex);
215
216        /* at this point size cannot be 0 */
217        nextIndex = (nextIndex + 1) % size;
218      }
219
220      return resource;
221    }
222
223    @Override
224    public boolean remove(R resource) {
225      return resources.remove(resource);
226    }
227
228    @Override
229    public void clear() {
230      resources.clear();
231    }
232
233    @Override
234    public Collection<R> values() {
235      return resources;
236    }
237
238    @Override
239    public int size() {
240      return resources.size();
241    }
242  }
243
244  /**
245   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that works similarly to
246   * {@link ThreadLocal} class. It essentially binds the resource to the thread from which it is
247   * accessed. It doesn't remove resources when a thread exits, those resources must be closed
248   * manually.
249   * <p>
250   * Note that the size of the pool is essentially bounded by the number of threads that add
251   * resources to this pool.
252   * </p>
253   * @param <R> the type of the resource
254   */
255  static class ThreadLocalPool<R> implements Pool<R> {
256    private final Map<Thread, R> resources;
257
258    public ThreadLocalPool() {
259      resources = new HashMap<>();
260    }
261
262    @Override
263    public R getOrCreate(PoolResourceSupplier<R> supplier) throws IOException {
264      Thread myself = Thread.currentThread();
265      R resource = resources.get(myself);
266
267      if (resource == null) {
268        resource = createResource(supplier);
269        resources.put(myself, resource);
270      }
271
272      return resource;
273    }
274
275    @Override
276    public boolean remove(R resource) {
277      /* remove can be called from any thread */
278      return resources.values().remove(resource);
279    }
280
281    @Override
282    public int size() {
283      return resources.size();
284    }
285
286    @Override
287    public void clear() {
288      resources.clear();
289    }
290
291    @Override
292    public Collection<R> values() {
293      return resources.values();
294    }
295  }
296}