1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.util.ByteRange;
32 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
33
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 @InterfaceAudience.Private
59 public class HeapMemStoreLAB implements MemStoreLAB {
60
61 static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
62 static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
63 static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
64 static final int MAX_ALLOC_DEFAULT = 256 * 1024;
65
66
67 static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
68
69 private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
70
71 private BlockingQueue<Chunk> chunkQueue = null;
72 final int chunkSize;
73 final int maxAlloc;
74 private final MemStoreChunkPool chunkPool;
75
76
77
78 private volatile boolean closed = false;
79
80
81 private AtomicBoolean reclaimed = new AtomicBoolean(false);
82
83 private final AtomicInteger openScannerCount = new AtomicInteger();
84
85
86 public HeapMemStoreLAB() {
87 this(new Configuration());
88 }
89
90 public HeapMemStoreLAB(Configuration conf) {
91 chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
92 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
93 this.chunkPool = MemStoreChunkPool.getPool(conf);
94
95 if (this.chunkPool != null) {
96
97
98 chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
99 }
100
101
102 Preconditions.checkArgument(
103 maxAlloc <= chunkSize,
104 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
105 }
106
107
108
109
110
111
112
113 @Override
114 public ByteRange allocateBytes(int size) {
115 Preconditions.checkArgument(size >= 0, "negative size");
116
117
118
119 if (size > maxAlloc) {
120 return null;
121 }
122
123 while (true) {
124 Chunk c = getOrMakeChunk();
125
126
127 int allocOffset = c.alloc(size);
128 if (allocOffset != -1) {
129
130
131 return new SimpleMutableByteRange(c.data, allocOffset, size);
132 }
133
134
135
136 tryRetireChunk(c);
137 }
138 }
139
140
141
142
143
144 @Override
145 public void close() {
146 this.closed = true;
147
148
149 if (chunkPool != null && openScannerCount.get() == 0
150 && reclaimed.compareAndSet(false, true)) {
151 chunkPool.putbackChunks(this.chunkQueue);
152 }
153 }
154
155
156
157
158 @Override
159 public void incScannerCount() {
160 this.openScannerCount.incrementAndGet();
161 }
162
163
164
165
166 @Override
167 public void decScannerCount() {
168 int count = this.openScannerCount.decrementAndGet();
169 if (chunkPool != null && count == 0 && this.closed
170 && reclaimed.compareAndSet(false, true)) {
171 chunkPool.putbackChunks(this.chunkQueue);
172 }
173 }
174
175
176
177
178
179
180
181
182 private void tryRetireChunk(Chunk c) {
183 curChunk.compareAndSet(c, null);
184
185
186
187
188
189
190 }
191
192
193
194
195
196 private Chunk getOrMakeChunk() {
197 while (true) {
198
199 Chunk c = curChunk.get();
200 if (c != null) {
201 return c;
202 }
203
204
205
206
207 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
208 if (curChunk.compareAndSet(null, c)) {
209
210
211 c.init();
212 if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
213 if (LOG.isTraceEnabled()) {
214 LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
215 + chunkQueue.size());
216 }
217 }
218 return c;
219 } else if (chunkPool != null) {
220 chunkPool.putbackChunk(c);
221 }
222
223
224 }
225 }
226
227 @VisibleForTesting
228 Chunk getCurrentChunk() {
229 return this.curChunk.get();
230 }
231
232 @VisibleForTesting
233 BlockingQueue<Chunk> getChunkQueue() {
234 return this.chunkQueue;
235 }
236
237
238
239
240 static class Chunk {
241
242 private byte[] data;
243
244 private static final int UNINITIALIZED = -1;
245 private static final int OOM = -2;
246
247
248
249
250 private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
251
252
253 private AtomicInteger allocCount = new AtomicInteger();
254
255
256 private final int size;
257
258
259
260
261
262
263 Chunk(int size) {
264 this.size = size;
265 }
266
267
268
269
270
271
272 public void init() {
273 assert nextFreeOffset.get() == UNINITIALIZED;
274 try {
275 if (data == null) {
276 data = new byte[size];
277 }
278 } catch (OutOfMemoryError e) {
279 boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
280 assert failInit;
281 throw e;
282 }
283
284 boolean initted = nextFreeOffset.compareAndSet(
285 UNINITIALIZED, 0);
286
287
288 Preconditions.checkState(initted,
289 "Multiple threads tried to init same chunk");
290 }
291
292
293
294
295 void reset() {
296 if (nextFreeOffset.get() != UNINITIALIZED) {
297 nextFreeOffset.set(UNINITIALIZED);
298 allocCount.set(0);
299 }
300 }
301
302
303
304
305
306 public int alloc(int size) {
307 while (true) {
308 int oldOffset = nextFreeOffset.get();
309 if (oldOffset == UNINITIALIZED) {
310
311
312
313
314 Thread.yield();
315 continue;
316 }
317 if (oldOffset == OOM) {
318
319 return -1;
320 }
321
322 if (oldOffset + size > data.length) {
323 return -1;
324 }
325
326
327 if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
328
329 allocCount.incrementAndGet();
330 return oldOffset;
331 }
332
333 }
334 }
335
336 @Override
337 public String toString() {
338 return "Chunk@" + System.identityHashCode(this) +
339 " allocs=" + allocCount.get() + "waste=" +
340 (data.length - nextFreeOffset.get());
341 }
342
343 @VisibleForTesting
344 int getNextFreeOffset() {
345 return this.nextFreeOffset.get();
346 }
347 }
348 }