View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  
30  import com.google.common.base.Preconditions;
31  
32  /**
33   * A memstore-local allocation buffer.
34   * <p>
35   * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
36   * big (2MB) byte[] chunks from and then doles it out to threads that request
37   * slices into the array.
38   * <p>
39   * The purpose of this class is to combat heap fragmentation in the
40   * regionserver. By ensuring that all KeyValues in a given memstore refer
41   * only to large chunks of contiguous memory, we ensure that large blocks
42   * get freed up when the memstore is flushed.
43   * <p>
44   * Without the MSLAB, the byte array allocated during insertion end up
45   * interleaved throughout the heap, and the old generation gets progressively
46   * more fragmented until a stop-the-world compacting collection occurs.
47   * <p>
48   * TODO: we should probably benchmark whether word-aligning the allocations
49   * would provide a performance improvement - probably would speed up the
50   * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
51   * anyway
52   */
53  @InterfaceAudience.Private
54  public class MemStoreLAB {
55    private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
56    // A queue of chunks contained by this memstore
57    private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
58  
59    final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
60    final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
61    final int chunkSize;
62  
63    final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
64    final static int MAX_ALLOC_DEFAULT = 256  * 1024; // allocs bigger than this don't go through allocator
65    final int maxAlloc;
66  
67    private final MemStoreChunkPool chunkPool;
68  
69    // This flag is for closing this instance, its set when clearing snapshot of
70    // memstore
71    private volatile boolean closed = false;
72    // This flag is for reclaiming chunks. Its set when putting chunks back to
73    // pool
74    private AtomicBoolean reclaimed = new AtomicBoolean(false);
75    // Current count of open scanners which reading data from this MemStoreLAB
76    private final AtomicInteger openScannerCount = new AtomicInteger();
77  
78    // Used in testing
79    public MemStoreLAB() {
80      this(new Configuration());
81    }
82  
83    private MemStoreLAB(Configuration conf) {
84      this(conf, MemStoreChunkPool.getPool(conf));
85    }
86  
87    public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
88      chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
89      maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
90      this.chunkPool = pool;
91  
92      // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
93      Preconditions.checkArgument(
94        maxAlloc <= chunkSize,
95        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
96    }
97  
98    /**
99     * Allocate a slice of the given length.
100    *
101    * If the size is larger than the maximum size specified for this
102    * allocator, returns null.
103    */
104   public Allocation allocateBytes(int size) {
105     Preconditions.checkArgument(size >= 0, "negative size");
106 
107     // Callers should satisfy large allocations directly from JVM since they
108     // don't cause fragmentation as badly.
109     if (size > maxAlloc) {
110       return null;
111     }
112 
113     while (true) {
114       Chunk c = getOrMakeChunk();
115 
116       // Try to allocate from this chunk
117       int allocOffset = c.alloc(size);
118       if (allocOffset != -1) {
119         // We succeeded - this is the common case - small alloc
120         // from a big buffer
121         return new Allocation(c.data, allocOffset);
122       }
123 
124       // not enough space!
125       // try to retire this chunk
126       tryRetireChunk(c);
127     }
128   }
129 
130   /**
131    * Close this instance since it won't be used any more, try to put the chunks
132    * back to pool
133    */
134   void close() {
135     this.closed = true;
136     // We could put back the chunks to pool for reusing only when there is no
137     // opening scanner which will read their data
138     if (chunkPool != null && openScannerCount.get() == 0
139         && reclaimed.compareAndSet(false, true)) {
140       chunkPool.putbackChunks(this.chunkQueue);
141     }
142   }
143 
144   /**
145    * Called when opening a scanner on the data of this MemStoreLAB
146    */
147   void incScannerCount() {
148     this.openScannerCount.incrementAndGet();
149   }
150 
151   /**
152    * Called when closing a scanner on the data of this MemStoreLAB
153    */
154   void decScannerCount() {
155     int count = this.openScannerCount.decrementAndGet();
156     if (chunkPool != null && count == 0 && this.closed
157         && reclaimed.compareAndSet(false, true)) {
158       chunkPool.putbackChunks(this.chunkQueue);
159     }
160   }
161 
162   /**
163    * Try to retire the current chunk if it is still
164    * <code>c</code>. Postcondition is that curChunk.get()
165    * != c
166    */
167   private void tryRetireChunk(Chunk c) {
168     curChunk.compareAndSet(c, null);
169     // If the CAS succeeds, that means that we won the race
170     // to retire the chunk. We could use this opportunity to
171     // update metrics on external fragmentation.
172     //
173     // If the CAS fails, that means that someone else already
174     // retired the chunk for us.
175   }
176 
177   /**
178    * Get the current chunk, or, if there is no current chunk,
179    * allocate a new one from the JVM.
180    */
181   private Chunk getOrMakeChunk() {
182     while (true) {
183       // Try to get the chunk
184       Chunk c = curChunk.get();
185       if (c != null) {
186         return c;
187       }
188 
189       // No current chunk, so we want to allocate one. We race
190       // against other allocators to CAS in an uninitialized chunk
191       // (which is cheap to allocate)
192       c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
193       if (curChunk.compareAndSet(null, c)) {
194         // we won race - now we need to actually do the expensive
195         // allocation step
196         c.init();
197         this.chunkQueue.add(c);
198         return c;
199       } else if (chunkPool != null) {
200         chunkPool.putbackChunk(c);
201       }
202       // someone else won race - that's fine, we'll try to grab theirs
203       // in the next iteration of the loop.
204     }
205   }
206 
207   /**
208    * A chunk of memory out of which allocations are sliced.
209    */
210   static class Chunk {
211     /** Actual underlying data */
212     private byte[] data;
213 
214     private static final int UNINITIALIZED = -1;
215     private static final int OOM = -2;
216     /**
217      * Offset for the next allocation, or the sentinel value -1
218      * which implies that the chunk is still uninitialized.
219      * */
220     private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
221 
222     /** Total number of allocations satisfied from this buffer */
223     private AtomicInteger allocCount = new AtomicInteger();
224 
225     /** Size of chunk in bytes */
226     private final int size;
227 
228     /**
229      * Create an uninitialized chunk. Note that memory is not allocated yet, so
230      * this is cheap.
231      * @param size in bytes
232      */
233     Chunk(int size) {
234       this.size = size;
235     }
236 
237     /**
238      * Actually claim the memory for this chunk. This should only be called from
239      * the thread that constructed the chunk. It is thread-safe against other
240      * threads calling alloc(), who will block until the allocation is complete.
241      */
242     public void init() {
243       assert nextFreeOffset.get() == UNINITIALIZED;
244       try {
245         if (data == null) {
246           data = new byte[size];
247         }
248       } catch (OutOfMemoryError e) {
249         boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
250         assert failInit; // should be true.
251         throw e;
252       }
253       // Mark that it's ready for use
254       boolean initted = nextFreeOffset.compareAndSet(
255           UNINITIALIZED, 0);
256       // We should always succeed the above CAS since only one thread
257       // calls init()!
258       Preconditions.checkState(initted,
259           "Multiple threads tried to init same chunk");
260     }
261 
262     /**
263      * Reset the offset to UNINITIALIZED before before reusing an old chunk
264      */
265     void reset() {
266       if (nextFreeOffset.get() != UNINITIALIZED) {
267         nextFreeOffset.set(UNINITIALIZED);
268         allocCount.set(0);
269       }
270     }
271 
272     /**
273      * Try to allocate <code>size</code> bytes from the chunk.
274      * @return the offset of the successful allocation, or -1 to indicate not-enough-space
275      */
276     public int alloc(int size) {
277       while (true) {
278         int oldOffset = nextFreeOffset.get();
279         if (oldOffset == UNINITIALIZED) {
280           // The chunk doesn't have its data allocated yet.
281           // Since we found this in curChunk, we know that whoever
282           // CAS-ed it there is allocating it right now. So spin-loop
283           // shouldn't spin long!
284           Thread.yield();
285           continue;
286         }
287         if (oldOffset == OOM) {
288           // doh we ran out of ram. return -1 to chuck this away.
289           return -1;
290         }
291 
292         if (oldOffset + size > data.length) {
293           return -1; // alloc doesn't fit
294         }
295 
296         // Try to atomically claim this chunk
297         if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
298           // we got the alloc
299           allocCount.incrementAndGet();
300           return oldOffset;
301         }
302         // we raced and lost alloc, try again
303       }
304     }
305 
306     @Override
307     public String toString() {
308       return "Chunk@" + System.identityHashCode(this) +
309         " allocs=" + allocCount.get() + "waste=" +
310         (data.length - nextFreeOffset.get());
311     }
312   }
313 
314   /**
315    * The result of a single allocation. Contains the chunk that the
316    * allocation points into, and the offset in this array where the
317    * slice begins.
318    */
319   public static class Allocation {
320     private final byte[] data;
321     private final int offset;
322 
323     private Allocation(byte[] data, int off) {
324       this.data = data;
325       this.offset = off;
326     }
327 
328     @Override
329     public String toString() {
330       return "Allocation(" + "capacity=" + data.length + ", off=" + offset
331           + ")";
332     }
333 
334     byte[] getData() {
335       return data;
336     }
337 
338     int getOffset() {
339       return offset;
340     }
341   }
342 }