1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.util.ByteRange;
32 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
33
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36
37 /**
38 * A memstore-local allocation buffer.
39 * <p>
40 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
41 * big (2MB) byte[] chunks from and then doles it out to threads that request
42 * slices into the array.
43 * <p>
44 * The purpose of this class is to combat heap fragmentation in the
45 * regionserver. By ensuring that all KeyValues in a given memstore refer
46 * only to large chunks of contiguous memory, we ensure that large blocks
47 * get freed up when the memstore is flushed.
48 * <p>
49 * Without the MSLAB, the byte array allocated during insertion end up
50 * interleaved throughout the heap, and the old generation gets progressively
51 * more fragmented until a stop-the-world compacting collection occurs.
52 * <p>
53 * TODO: we should probably benchmark whether word-aligning the allocations
54 * would provide a performance improvement - probably would speed up the
55 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
56 * anyway
57 */
58 @InterfaceAudience.Private
59 public class HeapMemStoreLAB implements MemStoreLAB {
60
61 static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
62 static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
63 static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
64 static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
65 // allocator
66
67 static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
68
69 private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
70 // A queue of chunks contained by this memstore, used with chunk pool
71 private BlockingQueue<Chunk> chunkQueue = null;
72 final int chunkSize;
73 final int maxAlloc;
74 private final MemStoreChunkPool chunkPool;
75
76 // This flag is for closing this instance, its set when clearing snapshot of
77 // memstore
78 private volatile boolean closed = false;
79 // This flag is for reclaiming chunks. Its set when putting chunks back to
80 // pool
81 private AtomicBoolean reclaimed = new AtomicBoolean(false);
82 // Current count of open scanners which reading data from this MemStoreLAB
83 private final AtomicInteger openScannerCount = new AtomicInteger();
84
85 // Used in testing
86 public HeapMemStoreLAB() {
87 this(new Configuration());
88 }
89
90 public HeapMemStoreLAB(Configuration conf) {
91 chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
92 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
93 this.chunkPool = MemStoreChunkPool.getPool(conf);
94 // currently chunkQueue is only used for chunkPool
95 if (this.chunkPool != null) {
96 // set queue length to chunk pool max count to avoid keeping reference of
97 // too many non-reclaimable chunks
98 chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
99 }
100
101 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
102 Preconditions.checkArgument(
103 maxAlloc <= chunkSize,
104 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
105 }
106
107 /**
108 * Allocate a slice of the given length.
109 *
110 * If the size is larger than the maximum size specified for this
111 * allocator, returns null.
112 */
113 @Override
114 public ByteRange allocateBytes(int size) {
115 Preconditions.checkArgument(size >= 0, "negative size");
116
117 // Callers should satisfy large allocations directly from JVM since they
118 // don't cause fragmentation as badly.
119 if (size > maxAlloc) {
120 return null;
121 }
122
123 while (true) {
124 Chunk c = getOrMakeChunk();
125
126 // Try to allocate from this chunk
127 int allocOffset = c.alloc(size);
128 if (allocOffset != -1) {
129 // We succeeded - this is the common case - small alloc
130 // from a big buffer
131 return new SimpleMutableByteRange(c.data, allocOffset, size);
132 }
133
134 // not enough space!
135 // try to retire this chunk
136 tryRetireChunk(c);
137 }
138 }
139
140 /**
141 * Close this instance since it won't be used any more, try to put the chunks
142 * back to pool
143 */
144 @Override
145 public void close() {
146 this.closed = true;
147 // We could put back the chunks to pool for reusing only when there is no
148 // opening scanner which will read their data
149 if (chunkPool != null && openScannerCount.get() == 0
150 && reclaimed.compareAndSet(false, true)) {
151 chunkPool.putbackChunks(this.chunkQueue);
152 }
153 }
154
155 /**
156 * Called when opening a scanner on the data of this MemStoreLAB
157 */
158 @Override
159 public void incScannerCount() {
160 this.openScannerCount.incrementAndGet();
161 }
162
163 /**
164 * Called when closing a scanner on the data of this MemStoreLAB
165 */
166 @Override
167 public void decScannerCount() {
168 int count = this.openScannerCount.decrementAndGet();
169 if (chunkPool != null && count == 0 && this.closed
170 && reclaimed.compareAndSet(false, true)) {
171 chunkPool.putbackChunks(this.chunkQueue);
172 }
173 }
174
175 /**
176 * Try to retire the current chunk if it is still
177 * <code>c</code>. Postcondition is that curChunk.get()
178 * != c
179 * @param c the chunk to retire
180 * @return true if we won the race to retire the chunk
181 */
182 private void tryRetireChunk(Chunk c) {
183 curChunk.compareAndSet(c, null);
184 // If the CAS succeeds, that means that we won the race
185 // to retire the chunk. We could use this opportunity to
186 // update metrics on external fragmentation.
187 //
188 // If the CAS fails, that means that someone else already
189 // retired the chunk for us.
190 }
191
192 /**
193 * Get the current chunk, or, if there is no current chunk,
194 * allocate a new one from the JVM.
195 */
196 private Chunk getOrMakeChunk() {
197 while (true) {
198 // Try to get the chunk
199 Chunk c = curChunk.get();
200 if (c != null) {
201 return c;
202 }
203
204 // No current chunk, so we want to allocate one. We race
205 // against other allocators to CAS in an uninitialized chunk
206 // (which is cheap to allocate)
207 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
208 if (curChunk.compareAndSet(null, c)) {
209 // we won race - now we need to actually do the expensive
210 // allocation step
211 c.init();
212 if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
213 if (LOG.isTraceEnabled()) {
214 LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
215 + chunkQueue.size());
216 }
217 }
218 return c;
219 } else if (chunkPool != null) {
220 chunkPool.putbackChunk(c);
221 }
222 // someone else won race - that's fine, we'll try to grab theirs
223 // in the next iteration of the loop.
224 }
225 }
226
227 @VisibleForTesting
228 Chunk getCurrentChunk() {
229 return this.curChunk.get();
230 }
231
232 @VisibleForTesting
233 BlockingQueue<Chunk> getChunkQueue() {
234 return this.chunkQueue;
235 }
236
237 /**
238 * A chunk of memory out of which allocations are sliced.
239 */
240 static class Chunk {
241 /** Actual underlying data */
242 private byte[] data;
243
244 private static final int UNINITIALIZED = -1;
245 private static final int OOM = -2;
246 /**
247 * Offset for the next allocation, or the sentinel value -1
248 * which implies that the chunk is still uninitialized.
249 * */
250 private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
251
252 /** Total number of allocations satisfied from this buffer */
253 private AtomicInteger allocCount = new AtomicInteger();
254
255 /** Size of chunk in bytes */
256 private final int size;
257
258 /**
259 * Create an uninitialized chunk. Note that memory is not allocated yet, so
260 * this is cheap.
261 * @param size in bytes
262 */
263 Chunk(int size) {
264 this.size = size;
265 }
266
267 /**
268 * Actually claim the memory for this chunk. This should only be called from
269 * the thread that constructed the chunk. It is thread-safe against other
270 * threads calling alloc(), who will block until the allocation is complete.
271 */
272 public void init() {
273 assert nextFreeOffset.get() == UNINITIALIZED;
274 try {
275 if (data == null) {
276 data = new byte[size];
277 }
278 } catch (OutOfMemoryError e) {
279 boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
280 assert failInit; // should be true.
281 throw e;
282 }
283 // Mark that it's ready for use
284 boolean initted = nextFreeOffset.compareAndSet(
285 UNINITIALIZED, 0);
286 // We should always succeed the above CAS since only one thread
287 // calls init()!
288 Preconditions.checkState(initted,
289 "Multiple threads tried to init same chunk");
290 }
291
292 /**
293 * Reset the offset to UNINITIALIZED before before reusing an old chunk
294 */
295 void reset() {
296 if (nextFreeOffset.get() != UNINITIALIZED) {
297 nextFreeOffset.set(UNINITIALIZED);
298 allocCount.set(0);
299 }
300 }
301
302 /**
303 * Try to allocate <code>size</code> bytes from the chunk.
304 * @return the offset of the successful allocation, or -1 to indicate not-enough-space
305 */
306 public int alloc(int size) {
307 while (true) {
308 int oldOffset = nextFreeOffset.get();
309 if (oldOffset == UNINITIALIZED) {
310 // The chunk doesn't have its data allocated yet.
311 // Since we found this in curChunk, we know that whoever
312 // CAS-ed it there is allocating it right now. So spin-loop
313 // shouldn't spin long!
314 Thread.yield();
315 continue;
316 }
317 if (oldOffset == OOM) {
318 // doh we ran out of ram. return -1 to chuck this away.
319 return -1;
320 }
321
322 if (oldOffset + size > data.length) {
323 return -1; // alloc doesn't fit
324 }
325
326 // Try to atomically claim this chunk
327 if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
328 // we got the alloc
329 allocCount.incrementAndGet();
330 return oldOffset;
331 }
332 // we raced and lost alloc, try again
333 }
334 }
335
336 @Override
337 public String toString() {
338 return "Chunk@" + System.identityHashCode(this) +
339 " allocs=" + allocCount.get() + "waste=" +
340 (data.length - nextFreeOffset.get());
341 }
342
343 @VisibleForTesting
344 int getNextFreeOffset() {
345 return this.nextFreeOffset.get();
346 }
347 }
348 }