1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 @InterfaceAudience.Private
54 public class PoolMap<K, V> implements Map<K, V> {
55 private PoolType poolType;
56
57 private int poolMaxSize;
58
59 private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
60
61 public PoolMap(PoolType poolType) {
62 this.poolType = poolType;
63 }
64
65 public PoolMap(PoolType poolType, int poolMaxSize) {
66 this.poolType = poolType;
67 this.poolMaxSize = poolMaxSize;
68 }
69
70 @Override
71 public V get(Object key) {
72 Pool<V> pool = pools.get(key);
73 return pool != null ? pool.get() : null;
74 }
75
76 @Override
77 public V put(K key, V value) {
78 Pool<V> pool = pools.get(key);
79 if (pool == null) {
80 pools.put(key, pool = createPool());
81 }
82 return pool != null ? pool.put(value) : null;
83 }
84
85 @SuppressWarnings("unchecked")
86 @Override
87 public V remove(Object key) {
88 Pool<V> pool = pools.remove(key);
89 if (pool != null) {
90 removeValue((K) key, pool.get());
91 }
92 return null;
93 }
94
95 public boolean removeValue(K key, V value) {
96 Pool<V> pool = pools.get(key);
97 boolean res = false;
98 if (pool != null) {
99 res = pool.remove(value);
100 if (res && pool.size() == 0) {
101 pools.remove(key);
102 }
103 }
104 return res;
105 }
106
107 @Override
108 public Collection<V> values() {
109 Collection<V> values = new ArrayList<V>();
110 for (Pool<V> pool : pools.values()) {
111 Collection<V> poolValues = pool.values();
112 if (poolValues != null) {
113 values.addAll(poolValues);
114 }
115 }
116 return values;
117 }
118
119 public Collection<V> values(K key) {
120 Collection<V> values = new ArrayList<V>();
121 Pool<V> pool = pools.get(key);
122 if (pool != null) {
123 Collection<V> poolValues = pool.values();
124 if (poolValues != null) {
125 values.addAll(poolValues);
126 }
127 }
128 return values;
129 }
130
131
132 @Override
133 public boolean isEmpty() {
134 return pools.isEmpty();
135 }
136
137 @Override
138 public int size() {
139 return pools.size();
140 }
141
142 public int size(K key) {
143 Pool<V> pool = pools.get(key);
144 return pool != null ? pool.size() : 0;
145 }
146
147 @Override
148 public boolean containsKey(Object key) {
149 return pools.containsKey(key);
150 }
151
152 @Override
153 public boolean containsValue(Object value) {
154 if (value == null) {
155 return false;
156 }
157 for (Pool<V> pool : pools.values()) {
158 if (value.equals(pool.get())) {
159 return true;
160 }
161 }
162 return false;
163 }
164
165 @Override
166 public void putAll(Map<? extends K, ? extends V> map) {
167 for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
168 put(entry.getKey(), entry.getValue());
169 }
170 }
171
172 @Override
173 public void clear() {
174 for (Pool<V> pool : pools.values()) {
175 pool.clear();
176 }
177 pools.clear();
178 }
179
180 @Override
181 public Set<K> keySet() {
182 return pools.keySet();
183 }
184
185 @Override
186 public Set<Map.Entry<K, V>> entrySet() {
187 Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
188 for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
189 final K poolKey = poolEntry.getKey();
190 final Pool<V> pool = poolEntry.getValue();
191 if (pool != null) {
192 for (final V poolValue : pool.values()) {
193 entries.add(new Map.Entry<K, V>() {
194 @Override
195 public K getKey() {
196 return poolKey;
197 }
198
199 @Override
200 public V getValue() {
201 return poolValue;
202 }
203
204 @Override
205 public V setValue(V value) {
206 return pool.put(value);
207 }
208 });
209 }
210 }
211 }
212 return entries;
213 }
214
215 protected interface Pool<R> {
216 R get();
217
218 R put(R resource);
219
220 boolean remove(R resource);
221
222 void clear();
223
224 Collection<R> values();
225
226 int size();
227 }
228
229 public enum PoolType {
230 Reusable, ThreadLocal, RoundRobin;
231
232 public static PoolType valueOf(String poolTypeName,
233 PoolType defaultPoolType, PoolType... allowedPoolTypes) {
234 PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
235 if (poolType != null) {
236 boolean allowedType = false;
237 if (poolType.equals(defaultPoolType)) {
238 allowedType = true;
239 } else {
240 if (allowedPoolTypes != null) {
241 for (PoolType allowedPoolType : allowedPoolTypes) {
242 if (poolType.equals(allowedPoolType)) {
243 allowedType = true;
244 break;
245 }
246 }
247 }
248 }
249 if (!allowedType) {
250 poolType = null;
251 }
252 }
253 return (poolType != null) ? poolType : defaultPoolType;
254 }
255
256 public static String fuzzyNormalize(String name) {
257 return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
258 }
259
260 public static PoolType fuzzyMatch(String name) {
261 for (PoolType poolType : values()) {
262 if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
263 return poolType;
264 }
265 }
266 return null;
267 }
268 }
269
270 protected Pool<V> createPool() {
271 switch (poolType) {
272 case Reusable:
273 return new ReusablePool<V>(poolMaxSize);
274 case RoundRobin:
275 return new RoundRobinPool<V>(poolMaxSize);
276 case ThreadLocal:
277 return new ThreadLocalPool<V>();
278 }
279 return null;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 @SuppressWarnings("serial")
300 public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
301 private int maxSize;
302
303 public ReusablePool(int maxSize) {
304 this.maxSize = maxSize;
305
306 }
307
308 @Override
309 public R get() {
310 return poll();
311 }
312
313 @Override
314 public R put(R resource) {
315 if (super.size() < maxSize) {
316 add(resource);
317 }
318 return null;
319 }
320
321 @Override
322 public Collection<R> values() {
323 return this;
324 }
325 }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343 @SuppressWarnings("serial")
344 class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
345 private int maxSize;
346 private int nextResource = 0;
347
348 public RoundRobinPool(int maxSize) {
349 this.maxSize = maxSize;
350 }
351
352 @Override
353 public R put(R resource) {
354 if (super.size() < maxSize) {
355 add(resource);
356 }
357 return null;
358 }
359
360 @Override
361 public R get() {
362 if (super.size() < maxSize) {
363 return null;
364 }
365 nextResource %= super.size();
366 R resource = get(nextResource++);
367 return resource;
368 }
369
370 @Override
371 public Collection<R> values() {
372 return this;
373 }
374
375 }
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390 static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
391 private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
392
393 public ThreadLocalPool() {
394 }
395
396 @Override
397 public R put(R resource) {
398 R previousResource = get();
399 if (previousResource == null) {
400 AtomicInteger poolSize = poolSizes.get(this);
401 if (poolSize == null) {
402 poolSizes.put(this, poolSize = new AtomicInteger(0));
403 }
404 poolSize.incrementAndGet();
405 }
406 this.set(resource);
407 return previousResource;
408 }
409
410 @Override
411 public void remove() {
412 super.remove();
413 AtomicInteger poolSize = poolSizes.get(this);
414 if (poolSize != null) {
415 poolSize.decrementAndGet();
416 }
417 }
418
419 @Override
420 public int size() {
421 AtomicInteger poolSize = poolSizes.get(this);
422 return poolSize != null ? poolSize.get() : 0;
423 }
424
425 @Override
426 public boolean remove(R resource) {
427 R previousResource = super.get();
428 if (resource != null && resource.equals(previousResource)) {
429 remove();
430 return true;
431 } else {
432 return false;
433 }
434 }
435
436 @Override
437 public void clear() {
438 super.remove();
439 }
440
441 @Override
442 public Collection<R> values() {
443 List<R> values = new ArrayList<R>();
444 values.add(get());
445 return values;
446 }
447 }
448 }