View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.util.ByteRange;
32  import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
33  
34  import com.google.common.annotations.VisibleForTesting;
35  import com.google.common.base.Preconditions;
36  
37  /**
38   * A memstore-local allocation buffer.
39   * <p>
40   * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
41   * big (2MB) byte[] chunks from and then doles it out to threads that request
42   * slices into the array.
43   * <p>
44   * The purpose of this class is to combat heap fragmentation in the
45   * regionserver. By ensuring that all KeyValues in a given memstore refer
46   * only to large chunks of contiguous memory, we ensure that large blocks
47   * get freed up when the memstore is flushed.
48   * <p>
49   * Without the MSLAB, the byte array allocated during insertion end up
50   * interleaved throughout the heap, and the old generation gets progressively
51   * more fragmented until a stop-the-world compacting collection occurs.
52   * <p>
53   * TODO: we should probably benchmark whether word-aligning the allocations
54   * would provide a performance improvement - probably would speed up the
55   * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
56   * anyway
57   */
58  @InterfaceAudience.Private
59  public class HeapMemStoreLAB implements MemStoreLAB {
60  
61    static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
62    static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
63    static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
64    static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
65                                                     // allocator
66  
67    static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
68  
69    private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
70    // A queue of chunks contained by this memstore, used with chunk pool
71    private BlockingQueue<Chunk> chunkQueue = null;
72    final int chunkSize;
73    final int maxAlloc;
74    private final MemStoreChunkPool chunkPool;
75  
76    // This flag is for closing this instance, its set when clearing snapshot of
77    // memstore
78    private volatile boolean closed = false;
79    // This flag is for reclaiming chunks. Its set when putting chunks back to
80    // pool
81    private AtomicBoolean reclaimed = new AtomicBoolean(false);
82    // Current count of open scanners which reading data from this MemStoreLAB
83    private final AtomicInteger openScannerCount = new AtomicInteger();
84  
85    // Used in testing
86    public HeapMemStoreLAB() {
87      this(new Configuration());
88    }
89  
90    public HeapMemStoreLAB(Configuration conf) {
91      chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
92      maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
93      this.chunkPool = MemStoreChunkPool.getPool(conf);
94      // currently chunkQueue is only used for chunkPool
95      if (this.chunkPool != null) {
96        // set queue length to chunk pool max count to avoid keeping reference of
97        // too many non-reclaimable chunks
98        chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
99      }
100 
101     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
102     Preconditions.checkArgument(
103       maxAlloc <= chunkSize,
104       MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
105   }
106 
107   /**
108    * Allocate a slice of the given length.
109    *
110    * If the size is larger than the maximum size specified for this
111    * allocator, returns null.
112    */
113   @Override
114   public ByteRange allocateBytes(int size) {
115     Preconditions.checkArgument(size >= 0, "negative size");
116 
117     // Callers should satisfy large allocations directly from JVM since they
118     // don't cause fragmentation as badly.
119     if (size > maxAlloc) {
120       return null;
121     }
122 
123     while (true) {
124       Chunk c = getOrMakeChunk();
125 
126       // Try to allocate from this chunk
127       int allocOffset = c.alloc(size);
128       if (allocOffset != -1) {
129         // We succeeded - this is the common case - small alloc
130         // from a big buffer
131         return new SimpleMutableByteRange(c.data, allocOffset, size);
132       }
133 
134       // not enough space!
135       // try to retire this chunk
136       tryRetireChunk(c);
137     }
138   }
139 
140   /**
141    * Close this instance since it won't be used any more, try to put the chunks
142    * back to pool
143    */
144   @Override
145   public void close() {
146     this.closed = true;
147     // We could put back the chunks to pool for reusing only when there is no
148     // opening scanner which will read their data
149     if (chunkPool != null && openScannerCount.get() == 0
150         && reclaimed.compareAndSet(false, true)) {
151       chunkPool.putbackChunks(this.chunkQueue);
152     }
153   }
154 
155   /**
156    * Called when opening a scanner on the data of this MemStoreLAB
157    */
158   @Override
159   public void incScannerCount() {
160     this.openScannerCount.incrementAndGet();
161   }
162 
163   /**
164    * Called when closing a scanner on the data of this MemStoreLAB
165    */
166   @Override
167   public void decScannerCount() {
168     int count = this.openScannerCount.decrementAndGet();
169     if (chunkPool != null && count == 0 && this.closed
170         && reclaimed.compareAndSet(false, true)) {
171       chunkPool.putbackChunks(this.chunkQueue);
172     }
173   }
174 
175   /**
176    * Try to retire the current chunk if it is still
177    * <code>c</code>. Postcondition is that curChunk.get()
178    * != c
179    * @param c the chunk to retire
180    * @return true if we won the race to retire the chunk
181    */
182   private void tryRetireChunk(Chunk c) {
183     curChunk.compareAndSet(c, null);
184     // If the CAS succeeds, that means that we won the race
185     // to retire the chunk. We could use this opportunity to
186     // update metrics on external fragmentation.
187     //
188     // If the CAS fails, that means that someone else already
189     // retired the chunk for us.
190   }
191 
192   /**
193    * Get the current chunk, or, if there is no current chunk,
194    * allocate a new one from the JVM.
195    */
196   private Chunk getOrMakeChunk() {
197     while (true) {
198       // Try to get the chunk
199       Chunk c = curChunk.get();
200       if (c != null) {
201         return c;
202       }
203 
204       // No current chunk, so we want to allocate one. We race
205       // against other allocators to CAS in an uninitialized chunk
206       // (which is cheap to allocate)
207       c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
208       if (curChunk.compareAndSet(null, c)) {
209         // we won race - now we need to actually do the expensive
210         // allocation step
211         c.init();
212         if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
213           if (LOG.isTraceEnabled()) {
214             LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
215                 + chunkQueue.size());
216           }
217         }
218         return c;
219       } else if (chunkPool != null) {
220         chunkPool.putbackChunk(c);
221       }
222       // someone else won race - that's fine, we'll try to grab theirs
223       // in the next iteration of the loop.
224     }
225   }
226 
227   @VisibleForTesting
228   Chunk getCurrentChunk() {
229     return this.curChunk.get();
230   }
231 
232   @VisibleForTesting
233   BlockingQueue<Chunk> getChunkQueue() {
234     return this.chunkQueue;
235   }
236 
237   /**
238    * A chunk of memory out of which allocations are sliced.
239    */
240   static class Chunk {
241     /** Actual underlying data */
242     private byte[] data;
243 
244     private static final int UNINITIALIZED = -1;
245     private static final int OOM = -2;
246     /**
247      * Offset for the next allocation, or the sentinel value -1
248      * which implies that the chunk is still uninitialized.
249      * */
250     private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
251 
252     /** Total number of allocations satisfied from this buffer */
253     private AtomicInteger allocCount = new AtomicInteger();
254 
255     /** Size of chunk in bytes */
256     private final int size;
257 
258     /**
259      * Create an uninitialized chunk. Note that memory is not allocated yet, so
260      * this is cheap.
261      * @param size in bytes
262      */
263     Chunk(int size) {
264       this.size = size;
265     }
266 
267     /**
268      * Actually claim the memory for this chunk. This should only be called from
269      * the thread that constructed the chunk. It is thread-safe against other
270      * threads calling alloc(), who will block until the allocation is complete.
271      */
272     public void init() {
273       assert nextFreeOffset.get() == UNINITIALIZED;
274       try {
275         if (data == null) {
276           data = new byte[size];
277         }
278       } catch (OutOfMemoryError e) {
279         boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
280         assert failInit; // should be true.
281         throw e;
282       }
283       // Mark that it's ready for use
284       boolean initted = nextFreeOffset.compareAndSet(
285           UNINITIALIZED, 0);
286       // We should always succeed the above CAS since only one thread
287       // calls init()!
288       Preconditions.checkState(initted,
289           "Multiple threads tried to init same chunk");
290     }
291 
292     /**
293      * Reset the offset to UNINITIALIZED before before reusing an old chunk
294      */
295     void reset() {
296       if (nextFreeOffset.get() != UNINITIALIZED) {
297         nextFreeOffset.set(UNINITIALIZED);
298         allocCount.set(0);
299       }
300     }
301 
302     /**
303      * Try to allocate <code>size</code> bytes from the chunk.
304      * @return the offset of the successful allocation, or -1 to indicate not-enough-space
305      */
306     public int alloc(int size) {
307       while (true) {
308         int oldOffset = nextFreeOffset.get();
309         if (oldOffset == UNINITIALIZED) {
310           // The chunk doesn't have its data allocated yet.
311           // Since we found this in curChunk, we know that whoever
312           // CAS-ed it there is allocating it right now. So spin-loop
313           // shouldn't spin long!
314           Thread.yield();
315           continue;
316         }
317         if (oldOffset == OOM) {
318           // doh we ran out of ram. return -1 to chuck this away.
319           return -1;
320         }
321 
322         if (oldOffset + size > data.length) {
323           return -1; // alloc doesn't fit
324         }
325 
326         // Try to atomically claim this chunk
327         if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
328           // we got the alloc
329           allocCount.incrementAndGet();
330           return oldOffset;
331         }
332         // we raced and lost alloc, try again
333       }
334     }
335 
336     @Override
337     public String toString() {
338       return "Chunk@" + System.identityHashCode(this) +
339         " allocs=" + allocCount.get() + "waste=" +
340         (data.length - nextFreeOffset.get());
341     }
342 
343     @VisibleForTesting
344     int getNextFreeOffset() {
345       return this.nextFreeOffset.get();
346     }
347   }
348 }