1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import java.nio.ByteBuffer;
21 import java.util.Queue;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.locks.ReentrantLock;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.util.BoundedArrayQueue;
29
30 import com.google.common.annotations.VisibleForTesting;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class BoundedByteBufferPool {
50 private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
51
52 @VisibleForTesting
53 final Queue<ByteBuffer> buffers;
54
55
56 private final int maxByteBufferSizeToCache;
57
58
59 @VisibleForTesting
60 volatile int runningAverage;
61
62
63 private volatile int totalReservoirCapacity;
64
65
66 private AtomicLong allocations = new AtomicLong(0);
67
68 private ReentrantLock lock = new ReentrantLock();
69
70
71
72
73
74
75 public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
76 final int maxToCache) {
77 this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
78 this.runningAverage = initialByteBufferSize;
79 this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
80 }
81
82 public ByteBuffer getBuffer() {
83 ByteBuffer bb = null;
84 lock.lock();
85 try {
86 bb = this.buffers.poll();
87 if (bb != null) {
88 this.totalReservoirCapacity -= bb.capacity();
89 }
90 } finally {
91 lock.unlock();
92 }
93 if (bb != null) {
94
95 bb.clear();
96 } else {
97 bb = ByteBuffer.allocate(this.runningAverage);
98 this.allocations.incrementAndGet();
99 }
100 if (LOG.isTraceEnabled()) {
101 LOG.trace("runningAverage=" + this.runningAverage +
102 ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
103 ", alloctions=" + this.allocations.get());
104 }
105 return bb;
106 }
107
108 public void putBuffer(ByteBuffer bb) {
109
110 if (bb.capacity() > this.maxByteBufferSizeToCache) return;
111 boolean success = false;
112 int average = 0;
113 lock.lock();
114 try {
115 success = this.buffers.offer(bb);
116 if (success) {
117 this.totalReservoirCapacity += bb.capacity();
118 average = this.totalReservoirCapacity / this.buffers.size();
119 }
120 } finally {
121 lock.unlock();
122 }
123 if (!success) {
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("At capacity: " + this.buffers.size());
126 }
127 } else {
128 if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
129 this.runningAverage = average;
130 }
131 }
132 }
133 }